提交 03f67135 编写于 作者: A avalon566 提交者: 杨翊 SionYang

[Sharding Scaling]Refactor to support PostgreSQL (#3899)

* Refactor AbstractJdbcReader & Fix stream resultset

* Support PostgreSQL identifier quote string

* Refactor DistributionChannel, avoid deadlock

* Refactor AbstractJdbcReader

* Refactor SqlBuilder

* Refactor
上级 f3ea11d8
......@@ -24,11 +24,12 @@ import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.Pl
import org.apache.shardingsphere.shardingscaling.core.execute.executor.record.Record;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
......@@ -54,7 +55,7 @@ public final class DistributionChannel implements Channel {
private final AckCallback ackCallback;
private List<Record> toBeAcknowledgeRecords = new LinkedList<>();
private Queue<Record> toBeAcknowledgeRecords = new ConcurrentLinkedQueue<>();
private Map<LogPosition, Record> pendingAcknowledgeRecords = new ConcurrentHashMap<>();
......@@ -82,13 +83,12 @@ public final class DistributionChannel implements Channel {
private void ackRecords0() {
synchronized (DistributionChannel.this) {
Iterator<Record> iterator = toBeAcknowledgeRecords.iterator();
List<Record> result = new LinkedList<>();
while (iterator.hasNext()) {
Record record = iterator.next();
while (!toBeAcknowledgeRecords.isEmpty()) {
Record record = toBeAcknowledgeRecords.peek();
if (pendingAcknowledgeRecords.containsKey(record.getLogPosition())) {
result.add(record);
iterator.remove();
toBeAcknowledgeRecords.poll();
pendingAcknowledgeRecords.remove(record.getLogPosition());
} else {
break;
......@@ -101,7 +101,7 @@ public final class DistributionChannel implements Channel {
}
@Override
public synchronized void pushRecord(final Record record) throws InterruptedException {
public void pushRecord(final Record record) throws InterruptedException {
if (FinishedRecord.class.equals(record.getClass())) {
// broadcast
for (Map.Entry<String, MemoryChannel> entry : channels.entrySet()) {
......@@ -127,7 +127,7 @@ public final class DistributionChannel implements Channel {
}
@Override
public synchronized void ack() {
public void ack() {
findChannel().ack();
}
......@@ -149,15 +149,21 @@ public final class DistributionChannel implements Channel {
private void checkAssignment(final String threadId) {
if (!channelAssignment.containsKey(threadId)) {
synchronized (this) {
for (Map.Entry<String, MemoryChannel> entry : channels.entrySet()) {
if (!channelAssignment.containsValue(entry.getKey())) {
channelAssignment.put(threadId, entry.getKey());
}
if (!channelAssignment.containsKey(threadId)) {
assignmentChannel(threadId);
}
}
}
}
private void assignmentChannel(final String threadId) {
for (Map.Entry<String, MemoryChannel> entry : channels.entrySet()) {
if (!channelAssignment.containsValue(entry.getKey())) {
channelAssignment.put(threadId, entry.getKey());
}
}
}
private class SingleChannelAckCallback implements AckCallback {
@Override
......
......@@ -38,7 +38,6 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
/**
* generic jdbc reader implement.
......@@ -48,15 +47,15 @@ import java.sql.Types;
*/
@Slf4j
public abstract class AbstractJdbcReader extends AbstractSyncRunner implements JdbcReader {
@Getter(AccessLevel.PROTECTED)
private final RdbmsConfiguration rdbmsConfiguration;
private final DataSourceFactory dataSourceFactory;
@Setter
private Channel channel;
public AbstractJdbcReader(final RdbmsConfiguration rdbmsConfiguration, final DataSourceFactory dataSourceFactory) {
if (!JdbcDataSourceConfiguration.class.equals(rdbmsConfiguration.getDataSourceConfiguration().getClass())) {
throw new UnsupportedOperationException("AbstractJdbcReader only support JdbcDataSourceConfiguration");
......@@ -64,20 +63,19 @@ public abstract class AbstractJdbcReader extends AbstractSyncRunner implements J
this.rdbmsConfiguration = rdbmsConfiguration;
this.dataSourceFactory = dataSourceFactory;
}
@Override
public final void run() {
start();
read(channel);
}
@Override
public final void read(final Channel channel) {
try (Connection conn = dataSourceFactory.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()).getConnection()) {
String sql = String.format("select * from %s %s", rdbmsConfiguration.getTableName(), rdbmsConfiguration.getWhereCondition());
PreparedStatement ps = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(100);
ps.setFetchDirection(ResultSet.FETCH_REVERSE);
ps.setFetchSize(Integer.MIN_VALUE);
ResultSet rs = ps.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
while (isRunning() && rs.next()) {
......@@ -85,14 +83,7 @@ public abstract class AbstractJdbcReader extends AbstractSyncRunner implements J
record.setType("bootstrap-insert");
record.setTableName(rdbmsConfiguration.getTableNameMap().get(rdbmsConfiguration.getTableName()));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
if (Types.TIME == rs.getMetaData().getColumnType(i)
|| Types.DATE == rs.getMetaData().getColumnType(i)
|| Types.TIMESTAMP == rs.getMetaData().getColumnType(i)) {
// fix: jdbc Time objects represent a wall-clock time and not a duration as MySQL treats them
record.addColumn(new Column(rs.getString(i), true));
} else {
record.addColumn(new Column(rs.getObject(i), true));
}
record.addColumn(new Column(readValue(rs, i), true));
}
pushRecord(record);
}
......@@ -102,7 +93,19 @@ public abstract class AbstractJdbcReader extends AbstractSyncRunner implements J
pushRecord(new FinishedRecord(new NopLogPosition()));
}
}
/**
* Read value from {@code ResultSet}.
*
* @param resultSet result set
* @param index of read column
* @return value
* @throws SQLException sql exception
*/
protected Object readValue(final ResultSet resultSet, final int index) throws SQLException {
return resultSet.getObject(index);
}
private void pushRecord(final Record record) {
try {
channel.pushRecord(record);
......
......@@ -47,31 +47,40 @@ import java.util.List;
*/
@Slf4j
public abstract class AbstractJdbcWriter extends AbstractSyncRunner implements Writer {
private final RdbmsConfiguration rdbmsConfiguration;
private final DataSourceFactory dataSourceFactory;
private final SqlBuilder sqlBuilder;
private final AbstractSqlBuilder sqlBuilder;
private DbMetaDataUtil dbMetaDataUtil;
@Setter
private Channel channel;
public AbstractJdbcWriter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceFactory dataSourceFactory) {
this.rdbmsConfiguration = rdbmsConfiguration;
this.dataSourceFactory = dataSourceFactory;
this.dbMetaDataUtil = new DbMetaDataUtil(dataSourceFactory.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()));
this.sqlBuilder = new SqlBuilder(dataSourceFactory.getDataSource(rdbmsConfiguration.getDataSourceConfiguration()));
DataSource dataSource = dataSourceFactory.getDataSource(rdbmsConfiguration.getDataSourceConfiguration());
dbMetaDataUtil = new DbMetaDataUtil(dataSource);
sqlBuilder = createSqlBuilder(dataSource);
}
/**
* Create sql builder.
*
* @param dataSource data source
* @return sql builder
*/
public abstract AbstractSqlBuilder createSqlBuilder(DataSource dataSource);
@Override
public final void run() {
start();
write(channel);
}
@Override
public final void write(final Channel channel) {
try {
......@@ -90,7 +99,7 @@ public abstract class AbstractJdbcWriter extends AbstractSyncRunner implements W
throw new SyncTaskExecuteException(ex);
}
}
private void flush(final DataSource dataSource, final List<Record> buffer) throws SQLException {
try (Connection connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
......@@ -110,7 +119,7 @@ public abstract class AbstractJdbcWriter extends AbstractSyncRunner implements W
connection.commit();
}
}
private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
String insertSql = sqlBuilder.buildInsertSql(record.getTableName());
PreparedStatement ps = connection.prepareStatement(insertSql);
......@@ -124,7 +133,7 @@ public abstract class AbstractJdbcWriter extends AbstractSyncRunner implements W
// ignore
}
}
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
List<ColumnMetaData> metaData = dbMetaDataUtil.getColumnNames(record.getTableName());
List<String> primaryKeys = dbMetaDataUtil.getPrimaryKeys(record.getTableName());
......@@ -148,7 +157,7 @@ public abstract class AbstractJdbcWriter extends AbstractSyncRunner implements W
}
ps.execute();
}
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
List<ColumnMetaData> metaData = dbMetaDataUtil.getColumnNames(record.getTableName());
List<String> primaryKeys = dbMetaDataUtil.getPrimaryKeys(record.getTableName());
......
......@@ -20,36 +20,31 @@ package org.apache.shardingsphere.shardingscaling.core.execute.executor.writer;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.shardingsphere.shardingscaling.core.metadata.ColumnMetaData;
import org.apache.shardingsphere.shardingscaling.core.util.DbMetaDataUtil;
import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.shardingsphere.shardingscaling.core.util.DbMetaDataUtil;
import org.apache.shardingsphere.shardingscaling.core.metadata.ColumnMetaData;
/**
* Sql builder.
* Abstract sql builder.
*
* @author avalon566
*/
public final class SqlBuilder {
private static final String LEFT_ESCAPE_QUOTE = "`";
private static final String RIGHT_ESCAPE_QUOTE = "`";
public abstract class AbstractSqlBuilder {
private static final String INSERT_SQL_CACHE_KEY_PREFIX = "INSERT_";
private static final String UPDATE_SQL_CACHE_KEY_PREFIX = "UPDATE_";
private static final String DELETE_SQL_CACHE_KEY_PREFIX = "DELETE_";
private final LoadingCache<String, String> sqlCache;
private final DbMetaDataUtil dbMetaDataUtil;
public SqlBuilder(final DataSource dataSource) {
public AbstractSqlBuilder(final DataSource dataSource) {
this.dbMetaDataUtil = new DbMetaDataUtil(dataSource);
sqlCache = CacheBuilder.newBuilder()
.maximumSize(64)
......@@ -66,6 +61,20 @@ public final class SqlBuilder {
}
});
}
/**
* Get left identifier quote string.
*
* @return string
*/
protected abstract String getLeftIdentifierQuoteString();
/**
* Get right identifier quote string.
*
* @return string
*/
protected abstract String getRightIdentifierQuoteString();
/**
* Build insert sql.
......@@ -114,31 +123,31 @@ public final class SqlBuilder {
StringBuilder columns = new StringBuilder();
StringBuilder holder = new StringBuilder();
for (ColumnMetaData each : metaData) {
columns.append(String.format("%s%s%s,", LEFT_ESCAPE_QUOTE, each.getColumnName(), RIGHT_ESCAPE_QUOTE));
columns.append(String.format("%s%s%s,", getLeftIdentifierQuoteString(), each.getColumnName(), getRightIdentifierQuoteString()));
holder.append("?,");
}
columns.setLength(columns.length() - 1);
holder.setLength(holder.length() - 1);
return String.format("INSERT INTO %s%s%s(%s) VALUES(%s)", LEFT_ESCAPE_QUOTE, tableName, RIGHT_ESCAPE_QUOTE, columns.toString(), holder.toString());
return String.format("INSERT INTO %s%s%s(%s) VALUES(%s)", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), columns.toString(), holder.toString());
}
private String buildDeleteSqlInternal(final String tableName) {
List<String> primaryKeys = dbMetaDataUtil.getPrimaryKeys(tableName);
StringBuilder where = new StringBuilder();
for (String each : primaryKeys) {
where.append(String.format("%s%s%s = ?,", LEFT_ESCAPE_QUOTE, each, RIGHT_ESCAPE_QUOTE));
where.append(String.format("%s%s%s = ?,", getLeftIdentifierQuoteString(), each, getRightIdentifierQuoteString()));
}
where.setLength(where.length() - 1);
return String.format("DELETE FROM %s%s%s WHERE %s", LEFT_ESCAPE_QUOTE, tableName, RIGHT_ESCAPE_QUOTE, where.toString());
return String.format("DELETE FROM %s%s%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), where.toString());
}
private String buildUpdateSqlInternal(final String tableName) {
List<String> primaryKeys = dbMetaDataUtil.getPrimaryKeys(tableName);
StringBuilder where = new StringBuilder();
for (String each : primaryKeys) {
where.append(String.format("%s%s%s = ?,", LEFT_ESCAPE_QUOTE, each, RIGHT_ESCAPE_QUOTE));
where.append(String.format("%s%s%s = ?,", getLeftIdentifierQuoteString(), each, getRightIdentifierQuoteString()));
}
where.setLength(where.length() - 1);
return String.format("UPDATE %s%s%s SET %%s WHERE %s", LEFT_ESCAPE_QUOTE, tableName, RIGHT_ESCAPE_QUOTE, where.toString());
return String.format("UPDATE %s%s%s SET %%s WHERE %s", getLeftIdentifierQuoteString(), tableName, getRightIdentifierQuoteString(), where.toString());
}
}
......@@ -23,6 +23,9 @@ import org.apache.shardingsphere.shardingscaling.core.execute.executor.reader.Ab
import org.apache.shardingsphere.shardingscaling.core.metadata.JdbcUri;
import org.apache.shardingsphere.shardingscaling.core.util.DataSourceFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Map;
/**
......@@ -31,13 +34,13 @@ import java.util.Map;
* @author avalon566
*/
public final class MySQLJdbcReader extends AbstractJdbcReader {
public MySQLJdbcReader(final RdbmsConfiguration rdbmsConfiguration, final DataSourceFactory dataSourceFactory) {
super(rdbmsConfiguration, dataSourceFactory);
JdbcDataSourceConfiguration jdbcDataSourceConfiguration = (JdbcDataSourceConfiguration) getRdbmsConfiguration().getDataSourceConfiguration();
jdbcDataSourceConfiguration.setJdbcUrl(fixMysqlUrl(jdbcDataSourceConfiguration.getJdbcUrl()));
}
private String formatMysqlParams(final Map<String, String> params) {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, String> entry : params.entrySet()) {
......@@ -50,16 +53,32 @@ public final class MySQLJdbcReader extends AbstractJdbcReader {
result.deleteCharAt(result.length() - 1);
return result.toString();
}
private String fixMysqlUrl(final String url) {
JdbcUri uri = new JdbcUri(url);
return String.format("jdbc:%s://%s/%s?%s", uri.getScheme(), uri.getHost(), uri.getDatabase(), fixMysqlParams(uri.getParameters()));
}
private String fixMysqlParams(final Map<String, String> parameters) {
if (!parameters.containsKey("yearIsDateType")) {
parameters.put("yearIsDateType", "false");
}
return formatMysqlParams(parameters);
}
@Override
public Object readValue(final ResultSet resultSet, final int index) throws SQLException {
if (isDateTimeValue(resultSet.getMetaData().getColumnType(index))) {
// fix: jdbc Time objects represent a wall-clock time and not a duration as MySQL treats them
return resultSet.getString(index);
} else {
return resultSet.getObject(index);
}
}
private boolean isDateTimeValue(final int columnType) {
return Types.TIME == columnType
|| Types.DATE == columnType
|| Types.TIMESTAMP == columnType;
}
}
......@@ -19,17 +19,36 @@ package org.apache.shardingsphere.shardingscaling.mysql;
import org.apache.shardingsphere.shardingscaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.AbstractJdbcWriter;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.AbstractSqlBuilder;
import org.apache.shardingsphere.shardingscaling.core.util.DataSourceFactory;
import javax.sql.DataSource;
/**
* MySQL writer.
*
* @author avalon566
* @author yangyi
*/
public class MySQLWriter extends AbstractJdbcWriter {
public final class MySQLWriter extends AbstractJdbcWriter {
public MySQLWriter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceFactory dataSourceFactory) {
super(rdbmsConfiguration, dataSourceFactory);
}
@Override
public AbstractSqlBuilder createSqlBuilder(final DataSource dataSource) {
return new AbstractSqlBuilder(dataSource) {
@Override
public String getLeftIdentifierQuoteString() {
return "`";
}
@Override
public String getRightIdentifierQuoteString() {
return "`";
}
};
}
}
......@@ -19,17 +19,36 @@ package org.apache.shardingsphere.shardingscaling.postgresql;
import org.apache.shardingsphere.shardingscaling.core.config.RdbmsConfiguration;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.AbstractJdbcWriter;
import org.apache.shardingsphere.shardingscaling.core.execute.executor.writer.AbstractSqlBuilder;
import org.apache.shardingsphere.shardingscaling.core.util.DataSourceFactory;
import javax.sql.DataSource;
/**
* postgreSQL writer.
*
* @author avalon566
*/
public class PostgreSQLWriter extends AbstractJdbcWriter {
public final class PostgreSQLWriter extends AbstractJdbcWriter {
public PostgreSQLWriter(final RdbmsConfiguration rdbmsConfiguration, final DataSourceFactory dataSourceFactory) {
super(rdbmsConfiguration, dataSourceFactory);
}
@Override
public AbstractSqlBuilder createSqlBuilder(final DataSource dataSource) {
return new AbstractSqlBuilder(dataSource) {
@Override
public String getLeftIdentifierQuoteString() {
return "\"";
}
@Override
public String getRightIdentifierQuoteString() {
return "\"";
}
};
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册