提交 b6957099 编写于 作者: A avalon566

For checkstyle

上级 c60e6afb
......@@ -22,6 +22,8 @@ import info.avalon566.shardingscaling.sync.core.Channel;
import info.avalon566.shardingscaling.sync.core.FinishedRecord;
import info.avalon566.shardingscaling.sync.core.RdbmsConfiguration;
import info.avalon566.shardingscaling.sync.core.Reader;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
......@@ -35,38 +37,38 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
/**
* generic jdbc reader implement.
* @author avalon566
*/
@Slf4j
public abstract class AbstractJdbcReader extends AbstractRunner implements Reader {
protected final RdbmsConfiguration rdbmsConfiguration;
@Getter(AccessLevel.PROTECTED)
private final RdbmsConfiguration rdbmsConfiguration;
@Setter
private Channel channel;
public AbstractJdbcReader(RdbmsConfiguration rdbmsConfiguration) {
public AbstractJdbcReader(final RdbmsConfiguration rdbmsConfiguration) {
this.rdbmsConfiguration = rdbmsConfiguration;
}
@Override
public void run() {
public final void run() {
start();
read(channel);
}
@Override
public void read(Channel channel) {
public final void read(final Channel channel) {
try {
Connection conn = DriverManager.getConnection(
rdbmsConfiguration.getJdbcUrl(),
rdbmsConfiguration.getUsername(),
rdbmsConfiguration.getPassword());
var sql = String.format("select * from %s %s", rdbmsConfiguration.getTableName(), rdbmsConfiguration.getWhereCondition());
var ps = conn.prepareStatement(sql, TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
var ps = conn.prepareStatement(sql, java.sql.ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
ps.setFetchDirection(ResultSet.FETCH_REVERSE);
var rs = ps.executeQuery();
......@@ -88,8 +90,11 @@ public abstract class AbstractJdbcReader extends AbstractRunner implements Reade
}
}
/**
* generic jdbc reader split implement.
*/
@Override
public List<RdbmsConfiguration> split(int concurrency) {
public List<RdbmsConfiguration> split(final int concurrency) {
var primaryKeys = new DbMetaDataUtil(rdbmsConfiguration).getPrimaryKeys(rdbmsConfiguration.getTableName());
if (primaryKeys == null || primaryKeys.size() == 0) {
log.warn("{} 表主键不存在, 不支持并发执行", rdbmsConfiguration.getTableName());
......@@ -101,13 +106,9 @@ public abstract class AbstractJdbcReader extends AbstractRunner implements Reade
}
var metaData = new DbMetaDataUtil(rdbmsConfiguration).getColumNames(rdbmsConfiguration.getTableName());
var index = DbMetaDataUtil.findColumnIndex(metaData, primaryKeys.get(0));
try {
if (Types.INTEGER != metaData.get(index).getColumnType()) {
log.warn("{} 主键不是整形,不支持并发执行", rdbmsConfiguration.getTableName());
return Arrays.asList(rdbmsConfiguration);
}
} catch (Exception e) {
throw new RuntimeException(e);
if (Types.INTEGER != metaData.get(index).getColumnType()) {
log.warn("{} 主键不是整形,不支持并发执行", rdbmsConfiguration.getTableName());
return Arrays.asList(rdbmsConfiguration);
}
var pk = primaryKeys.get(0);
try {
......@@ -131,8 +132,8 @@ public abstract class AbstractJdbcReader extends AbstractRunner implements Reade
}
return configs;
}
} catch (Exception e) {
} catch (SQLException e) {
throw new RuntimeException("getTableNames error", e);
}
}
}
\ No newline at end of file
}
......@@ -17,11 +17,16 @@
package info.avalon566.shardingscaling.sync.jdbc;
import info.avalon566.shardingscaling.sync.core.*;
import info.avalon566.shardingscaling.sync.core.AbstractRunner;
import info.avalon566.shardingscaling.sync.core.Channel;
import info.avalon566.shardingscaling.sync.core.FinishedRecord;
import info.avalon566.shardingscaling.sync.core.RdbmsConfiguration;
import info.avalon566.shardingscaling.sync.core.Writer;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
......@@ -29,6 +34,7 @@ import java.sql.SQLIntegrityConstraintViolationException;
import java.util.ArrayList;
/**
* generic jdbc writer implement.
* @author avalon566
*/
@Slf4j
......@@ -43,20 +49,20 @@ public abstract class AbstractJdbcWriter extends AbstractRunner implements Write
@Setter
private Channel channel;
public AbstractJdbcWriter(RdbmsConfiguration rdbmsConfiguration) {
public AbstractJdbcWriter(final RdbmsConfiguration rdbmsConfiguration) {
this.rdbmsConfiguration = rdbmsConfiguration;
this.dbMetaDataUtil = new DbMetaDataUtil(rdbmsConfiguration);
this.sqlBuilder = new SqlBuilder(rdbmsConfiguration);
}
@Override
public void run() {
public final void run() {
start();
write(channel);
}
@Override
public void write(Channel channel) {
public final void write(final Channel channel) {
var buffer = new ArrayList<DataRecord>(2000);
var lastFlushTime = System.currentTimeMillis();
try {
......@@ -83,69 +89,78 @@ public abstract class AbstractJdbcWriter extends AbstractRunner implements Write
if (0 < buffer.size()) {
flush(rdbmsConfiguration, buffer);
}
} catch (Exception ex) {
} catch (SQLException ex) {
log.error(null, ex);
throw new RuntimeException(ex);
}
}
private void flush(RdbmsConfiguration config, ArrayList<DataRecord> buffer) throws SQLException {
private void flush(final RdbmsConfiguration config, final ArrayList<DataRecord> buffer) throws SQLException {
try (var connection = DriverManager.getConnection(config.getJdbcUrl(), config.getUsername(), config.getPassword())) {
connection.setAutoCommit(false);
for (int ij = 0; ij < buffer.size(); ij++) {
var record = buffer.get(ij);
if ("bootstrap-insert".equals(record.getType())
|| "insert".equals(record.getType())) {
var insertSql = sqlBuilder.buildInsertSql(record.getTableName());
PreparedStatement ps = connection.prepareStatement(insertSql);
ps.setQueryTimeout(30);
for (DataRecord r : buffer) {
try {
for (int i = 0; i < r.getColumnCount(); i++) {
ps.setObject(i + 1, r.getColumn(i).getValue());
}
ps.execute();
} catch (SQLIntegrityConstraintViolationException ex) {
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
executeInsert(connection, record);
} else if ("update".equals(record.getType())) {
var metaData = dbMetaDataUtil.getColumNames(record.getTableName());
var primaryKeys = dbMetaDataUtil.getPrimaryKeys(record.getTableName());
var updatedColumns = new StringBuilder();
var values = new ArrayList<Column>();
for (int i = 0; i < metaData.size(); i++) {
if (record.getColumn(i).isUpdated()) {
updatedColumns.append(String.format("%s = ?,", metaData.get(i).getColumnName()));
values.add(record.getColumn(i));
}
}
for (int i = 0; i < primaryKeys.size(); i++) {
var index = DbMetaDataUtil.findColumnIndex(metaData, primaryKeys.get(i));
values.add(record.getColumn(index));
}
var updateSql = sqlBuilder.buildUpdateSql(record.getTableName());
var sql = String.format(updateSql, updatedColumns.substring(0, updatedColumns.length() - 1));
PreparedStatement ps = connection.prepareStatement(sql);
for (int i = 0; i < values.size(); i++) {
ps.setObject(i + 1, values.get(i).getValue());
}
ps.execute();
executeUpdate(connection, record);
} else if ("delete".equals(record.getType())) {
var metaData = dbMetaDataUtil.getColumNames(record.getTableName());
var primaryKeys = dbMetaDataUtil.getPrimaryKeys(record.getTableName());
var deleteSql = sqlBuilder.buildDeleteSql(record.getTableName());
PreparedStatement ps = connection.prepareStatement(deleteSql);
for (int i = 0; i < primaryKeys.size(); i++) {
var index = DbMetaDataUtil.findColumnIndex(metaData, primaryKeys.get(i));
ps.setObject(i + 1, record.getColumn(index).getValue());
}
ps.execute();
executeDelete(connection, record);
}
}
connection.commit();
}
buffer.clear();
}
}
\ No newline at end of file
private void executeInsert(final Connection connection, final DataRecord record) throws SQLException {
var insertSql = sqlBuilder.buildInsertSql(record.getTableName());
PreparedStatement ps = connection.prepareStatement(insertSql);
ps.setQueryTimeout(30);
try {
for (int i = 0; i < record.getColumnCount(); i++) {
ps.setObject(i + 1, record.getColumn(i).getValue());
}
ps.execute();
} catch (SQLIntegrityConstraintViolationException ex) {
// ignore
}
}
private void executeUpdate(final Connection connection, final DataRecord record) throws SQLException {
var metaData = dbMetaDataUtil.getColumNames(record.getTableName());
var primaryKeys = dbMetaDataUtil.getPrimaryKeys(record.getTableName());
var updatedColumns = new StringBuilder();
var values = new ArrayList<Column>();
for (int i = 0; i < metaData.size(); i++) {
if (record.getColumn(i).isUpdated()) {
updatedColumns.append(String.format("%s = ?,", metaData.get(i).getColumnName()));
values.add(record.getColumn(i));
}
}
for (int i = 0; i < primaryKeys.size(); i++) {
var index = DbMetaDataUtil.findColumnIndex(metaData, primaryKeys.get(i));
values.add(record.getColumn(index));
}
var updateSql = sqlBuilder.buildUpdateSql(record.getTableName());
var sql = String.format(updateSql, updatedColumns.substring(0, updatedColumns.length() - 1));
PreparedStatement ps = connection.prepareStatement(sql);
for (int i = 0; i < values.size(); i++) {
ps.setObject(i + 1, values.get(i).getValue());
}
ps.execute();
}
private void executeDelete(final Connection connection, final DataRecord record) throws SQLException {
var metaData = dbMetaDataUtil.getColumNames(record.getTableName());
var primaryKeys = dbMetaDataUtil.getPrimaryKeys(record.getTableName());
var deleteSql = sqlBuilder.buildDeleteSql(record.getTableName());
PreparedStatement ps = connection.prepareStatement(deleteSql);
for (int i = 0; i < primaryKeys.size(); i++) {
var index = DbMetaDataUtil.findColumnIndex(metaData, primaryKeys.get(i));
ps.setObject(i + 1, record.getColumn(index).getValue());
}
ps.execute();
}
}
......@@ -38,7 +38,7 @@ public final class MySQLJdbcReader extends AbstractJdbcReader {
@Override
public List<RdbmsConfiguration> split(final int concurrency) {
rdbmsConfiguration.setJdbcUrl(fixMysqlUrl(rdbmsConfiguration.getJdbcUrl()));
getRdbmsConfiguration().setJdbcUrl(fixMysqlUrl(getRdbmsConfiguration().getJdbcUrl()));
return super.split(concurrency);
}
......
......@@ -20,6 +20,7 @@ package info.avalon566.shardingscaling.sync.mysql.binlog.event;
import lombok.Data;
/**
* Abstract binlog event.
* @author avalon566
*/
@Data
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册