diff --git a/src/main/java/info/avalon566/shardingscaling/job/DatabaseSyncJob.java b/src/main/java/info/avalon566/shardingscaling/job/DatabaseSyncJob.java index 8206220f6e85a2772cee6d7de0aa0c25d4a25106..3e5b939eb65a1cefbcb580057e894ddff343931b 100644 --- a/src/main/java/info/avalon566/shardingscaling/job/DatabaseSyncJob.java +++ b/src/main/java/info/avalon566/shardingscaling/job/DatabaseSyncJob.java @@ -21,6 +21,7 @@ import info.avalon566.shardingscaling.job.config.SyncConfiguration; import lombok.extern.slf4j.Slf4j; /** + * Database sync job. * @author avalon566 */ @Slf4j @@ -32,12 +33,15 @@ public class DatabaseSyncJob { private final RealtimeDataSyncer realtimeDataSyncer; - public DatabaseSyncJob(SyncConfiguration syncConfiguration) { + public DatabaseSyncJob(final SyncConfiguration syncConfiguration) { this.syncConfiguration = syncConfiguration; this.historyDataSyncer = new HistoryDataSyncer(syncConfiguration); this.realtimeDataSyncer = new RealtimeDataSyncer(syncConfiguration); } + /** + * Run. + */ public void run() { realtimeDataSyncer.preRun(); historyDataSyncer.run(); diff --git a/src/main/java/info/avalon566/shardingscaling/sync/core/AbstractRunner.java b/src/main/java/info/avalon566/shardingscaling/sync/core/AbstractRunner.java index 0d2ee96e8dd8f50522434bffd20c4d86b5f7c8e5..4b5425ebdddc573cf0499a649922cc8cbc186aa3 100644 --- a/src/main/java/info/avalon566/shardingscaling/sync/core/AbstractRunner.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/core/AbstractRunner.java @@ -17,18 +17,31 @@ package info.avalon566.shardingscaling.sync.core; +import lombok.AccessLevel; +import lombok.Getter; +import lombok.Setter; + /** + * Abstract runner. * @author avalon566 */ public abstract class AbstractRunner implements Runner { - protected boolean running = false; + @Setter(AccessLevel.PROTECTED) + @Getter(AccessLevel.PROTECTED) + private boolean running; + /** + * generic start implement. + */ @Override public void start() { running = true; } + /** + * generic stop implement. + */ @Override public void stop() { running = false; diff --git a/src/main/java/info/avalon566/shardingscaling/sync/core/Channel.java b/src/main/java/info/avalon566/shardingscaling/sync/core/Channel.java index 1285bd62f36d18c402e75dac1b58cf0824b5b94a..201384b8daf6175969280e876b9d9ed27e4935d7 100755 --- a/src/main/java/info/avalon566/shardingscaling/sync/core/Channel.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/core/Channel.java @@ -18,10 +18,20 @@ package info.avalon566.shardingscaling.sync.core; /** + * Channel. * @author avalon566 */ public interface Channel { + + /** + * push a {@code DataRecord} to channel. + * @param dataRecord data + */ void pushRecord(Record dataRecord); + /** + * pop a {@code DataRecord} from channel. + * @return dataRecord + */ Record popRecord(); -} \ No newline at end of file +} diff --git a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java index 92e750a769a21e86b63eb9c018bdb25aa5738e8f..80b67f298ae3bd11c350a1942a82767694e6ef06 100755 --- a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcReader.java @@ -73,7 +73,7 @@ public abstract class AbstractJdbcReader extends AbstractRunner implements Reade ps.setFetchDirection(ResultSet.FETCH_REVERSE); var rs = ps.executeQuery(); var metaData = rs.getMetaData(); - while (running && rs.next()) { + while (isRunning() && rs.next()) { var record = new DataRecord(metaData.getColumnCount()); record.setType("bootstrap-insert"); record.setFullTableName(String.format("%s.%s", conn.getCatalog(), rdbmsConfiguration.getTableName())); diff --git a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java index 63467bb3d4bb6d2ac5e9c549f73d81f83f019193..22a8b78e10400c4d8b5f310ba4a63ce04c965564 100755 --- a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/AbstractJdbcWriter.java @@ -66,7 +66,7 @@ public abstract class AbstractJdbcWriter extends AbstractRunner implements Write var buffer = new ArrayList(2000); var lastFlushTime = System.currentTimeMillis(); try { - while (running) { + while (isRunning()) { var record = channel.popRecord(); if (null == record) { try { diff --git a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/Column.java b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/Column.java index 2b203a4febd258bbef608f0cf5ef645f73f939ad..47f56ded7de9b7c183c55ae8a495b2e949f942f5 100755 --- a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/Column.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/Column.java @@ -21,11 +21,13 @@ import lombok.AllArgsConstructor; import lombok.Data; /** + * Column. * @author avalon566 */ @Data @AllArgsConstructor public class Column { private Object value; + private boolean updated; -} \ No newline at end of file +} diff --git a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java index e54e0cc5de95c17ca01fbdd1281f61f46a3632eb..6a9d5003da949872a3815d885e3b3b83d40ab0e3 100644 --- a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/ColumnMetaData.java @@ -20,6 +20,7 @@ package info.avalon566.shardingscaling.sync.jdbc; import lombok.Data; /** + * Column meta data. * @author avalon566 */ @Data diff --git a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DataRecord.java b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DataRecord.java index 627b4bf89a1e101a9121cd6950252d86b4bedb61..a5ea4a42c9086ac90e1f58d9fe45c3ad769aec65 100755 --- a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DataRecord.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DataRecord.java @@ -24,32 +24,54 @@ import java.util.ArrayList; import java.util.List; /** + * Data record. * @author avalon566 */ @Data public class DataRecord implements Record { + private String type; + private String tableName; + private String fullTableName; + private final List columns; - public DataRecord(int columnCount) { - columns = new ArrayList(columnCount); + public DataRecord(final int columnCount) { + columns = new ArrayList<>(columnCount); } - public void addColumn(Column data) { + /** + * Add a column to record. + * @param data column + */ + public void addColumn(final Column data) { columns.add(data); } + /** + * Return column count. + * @return count + */ public int getColumnCount() { return columns.size(); } - public Column getColumn(int index) { + /** + * Get column by index. + * @param index of column + * @return column + */ + public Column getColumn(final int index) { return columns.get(index); } + /** + * Get table name. + * @return tableName + */ public String getTableName() { return fullTableName.split("\\.")[1]; } -} \ No newline at end of file +} diff --git a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java index 785c7bd35b7e67717b927723b358b36170be5d22..003f9326b3a6db3241aaa2ee775100749bb3f0cb 100755 --- a/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/jdbc/DbMetaDataUtil.java @@ -26,12 +26,15 @@ import lombok.var; import java.sql.DriverManager; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; /** + * Database meta util. + * * @author avalon566 */ @Slf4j @@ -51,14 +54,14 @@ public final class DbMetaDataUtil { private LoadingCache> cmdCache; - public DbMetaDataUtil(RdbmsConfiguration rdbmsConfiguration) { + public DbMetaDataUtil(final RdbmsConfiguration rdbmsConfiguration) { this.rdbmsConfiguration = rdbmsConfiguration; pksCache = CacheBuilder.newBuilder() .maximumSize(64) .build(new CacheLoader>() { @Override - public List load(String key) { + public List load(final String key) { return getPrimaryKeysInternal(key); } }); @@ -67,13 +70,19 @@ public final class DbMetaDataUtil { .maximumSize(64) .build(new CacheLoader>() { @Override - public List load(String key) { + public List load(final String key) { return getColumNamesInternal(key); } }); } - public List getPrimaryKeys(String tableName) { + /** + * Get primary key column name by table name. + * + * @param tableName table name + * @return list of table name + */ + public List getPrimaryKeys(final String tableName) { try { return pksCache.get(tableName); } catch (ExecutionException e) { @@ -81,7 +90,7 @@ public final class DbMetaDataUtil { } } - private List getPrimaryKeysInternal(String tableName) { + private List getPrimaryKeysInternal(final String tableName) { try { try (var connection = DriverManager.getConnection(rdbmsConfiguration.getJdbcUrl(), rdbmsConfiguration.getUsername(), rdbmsConfiguration.getPassword())) { var rs = connection.getMetaData().getPrimaryKeys(connection.getCatalog(), null, tableName); @@ -91,11 +100,16 @@ public final class DbMetaDataUtil { } return primaryKeys; } - } catch (Exception e) { + } catch (SQLException e) { throw new RuntimeException("getTableNames error", e); } } + /** + * Get all table names in current database. + * + * @return list of table name + */ public List getTableNames() { try { try (var connection = DriverManager.getConnection(rdbmsConfiguration.getJdbcUrl(), rdbmsConfiguration.getUsername(), rdbmsConfiguration.getPassword())) { @@ -106,12 +120,18 @@ public final class DbMetaDataUtil { } return tableNames; } - } catch (Exception e) { + } catch (SQLException e) { throw new RuntimeException("getTableNames error", e); } } - public List getColumNames(String tableName) { + /** + * Get all column meta data by table name. + * + * @param tableName table name + * @return list of column meta data + */ + public List getColumNames(final String tableName) { try { return cmdCache.get(tableName); } catch (ExecutionException e) { @@ -119,36 +139,38 @@ public final class DbMetaDataUtil { } } - public List getColumNamesInternal(String tableName) { + private List getColumNamesInternal(final String tableName) { try { try (var connection = DriverManager.getConnection(rdbmsConfiguration.getJdbcUrl(), rdbmsConfiguration.getUsername(), rdbmsConfiguration.getPassword())) { var result = new ArrayList(); - try (ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), connection.getSchema(), tableName, "%")) { - while (resultSet.next()) { - var columnMetaData = new ColumnMetaData(); - columnMetaData.setColumnName(resultSet.getString(COLUMN_NAME)); - columnMetaData.setColumnType(resultSet.getInt(DATA_TYPE)); - columnMetaData.setColumnTypeName(resultSet.getString(TYPE_NAME)); - result.add(columnMetaData); - } + ResultSet resultSet = connection.getMetaData().getColumns(connection.getCatalog(), connection.getSchema(), tableName, "%"); + while (resultSet.next()) { + var columnMetaData = new ColumnMetaData(); + columnMetaData.setColumnName(resultSet.getString(COLUMN_NAME)); + columnMetaData.setColumnType(resultSet.getInt(DATA_TYPE)); + columnMetaData.setColumnTypeName(resultSet.getString(TYPE_NAME)); + result.add(columnMetaData); } return result; } - } catch (Exception e) { + } catch (SQLException e) { throw new RuntimeException("getTableNames error", e); } } - public static int findColumnIndex(List metaData, String columnName) { - try { - for (int i = 0; i < metaData.size(); i++) { - if (metaData.get(i).getColumnName().equals(columnName)) { - return i; - } + /** + * Find column index by column name. + * + * @param metaData meta data list + * @param columnName table name + * @return index + */ + public static int findColumnIndex(final List metaData, final String columnName) { + for (int i = 0; i < metaData.size(); i++) { + if (metaData.get(i).getColumnName().equals(columnName)) { + return i; } - return -1; - } catch (Exception e) { - throw new RuntimeException("findColumnIndex error", e); } + return -1; } } diff --git a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/AbstractPacket.java b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/AbstractPacket.java index 3ba9ffb9695f32b4e957e86d74713a52cd0c3b1f..8a1e41828121fb70db1f5e0f45cf9ffccd46a32f 100644 --- a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/AbstractPacket.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/AbstractPacket.java @@ -35,11 +35,19 @@ import lombok.Data; public abstract class AbstractPacket implements Packet { private byte sequenceNumber; + /** + * empty implement method,throw {@code UnsupportedOperationException}. + * @param data buffer + */ @Override public void fromByteBuf(final ByteBuf data) { throw new UnsupportedOperationException(); } + /** + * empty implement method,throw {@code UnsupportedOperationException}. + * @return data buffer + */ @Override public ByteBuf toByteBuf() { throw new UnsupportedOperationException(); diff --git a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java index 6a184160ed7c143cbfc66e71e6d48e612f70321f..cf1e4174d390016987ba0146c2a3d2666e9f5aea 100644 --- a/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java +++ b/src/main/java/info/avalon566/shardingscaling/sync/mysql/binlog/packet/binlog/ColumnTypes.java @@ -18,6 +18,7 @@ package info.avalon566.shardingscaling.sync.mysql.binlog.packet.binlog; /** + * Mysql column types. * @author avalon566 */ public final class ColumnTypes { diff --git a/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java b/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java index 5a3599ab319cf8d6c1960c90d32c3d4a0b90bf41..e7d12ec7a7b2a188349b5e334447798cb1e214e6 100644 --- a/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java +++ b/src/test/java/info/avalon566/shardingscaling/sync/mysql/binlog/codec/MySQLBinlogEventPacketDecoderTest.java @@ -50,7 +50,7 @@ public class MySQLBinlogEventPacketDecoderTest { @Before public void setUp() throws Exception { - binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(); + binlogEventPacketDecoder = new MySQLBinlogEventPacketDecoder(0); binlogContext = ReflectionUtil.getFieldValueFromClass(binlogEventPacketDecoder, "binlogContext", BinlogContext.class); binlogContext.setChecksumLength(4); }