未验证 提交 3116fb5e 编写于 作者: 家里敷泥呀's avatar 家里敷泥呀 提交者: GitHub

issue-5390: sharding-scaling nullpoint (#5679)

Co-authored-by: NLucas <qiulu3@jd.com>
上级 2180bc5a
......@@ -44,6 +44,7 @@ import org.apache.shardingsphere.sql.parser.binder.metadata.table.TableMetaData;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.util.Objects;
/**
* MySQL binlog reader.
......@@ -85,25 +86,33 @@ public final class MySQLBinlogReader extends AbstractSyncExecutor implements Log
while (isRunning()) {
AbstractBinlogEvent event = client.poll();
if (null == event) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
sleep();
continue;
}
if (event instanceof WriteRowsEvent) {
handleWriteRowsEvent(channel, uri, (WriteRowsEvent) event);
} else if (event instanceof UpdateRowsEvent) {
handleUpdateRowsEvent(channel, uri, (UpdateRowsEvent) event);
} else if (event instanceof DeleteRowsEvent) {
handleDeleteRowsEvent(channel, uri, (DeleteRowsEvent) event);
} else if (event instanceof PlaceholderEvent) {
createPlaceholderRecord(channel, event);
}
handleEvent(channel, uri, event);
}
pushRecord(channel, new FinishedRecord(new NopLogPosition()));
}
private void sleep() {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}
private void handleEvent(final Channel channel, final JdbcUri uri, final AbstractBinlogEvent event) {
if (event instanceof WriteRowsEvent) {
handleWriteRowsEvent(channel, uri, (WriteRowsEvent) event);
} else if (event instanceof UpdateRowsEvent) {
handleUpdateRowsEvent(channel, uri, (UpdateRowsEvent) event);
} else if (event instanceof DeleteRowsEvent) {
handleDeleteRowsEvent(channel, uri, (DeleteRowsEvent) event);
} else if (event instanceof PlaceholderEvent) {
createPlaceholderRecord(channel, event);
}
}
private void handleWriteRowsEvent(final Channel channel, final JdbcUri uri, final WriteRowsEvent event) {
if (filter(uri.getDatabase(), event.getSchemaName(), event.getTableName())) {
createPlaceholderRecord(channel, event);
......@@ -134,7 +143,7 @@ public final class MySQLBinlogReader extends AbstractSyncExecutor implements Log
for (int j = 0; j < beforeValues.length; j++) {
Object oldValue = beforeValues[j];
Object newValue = afterValues[j];
record.addColumn(new Column(tableMetaData.getColumnMetaData(j).getName(), newValue, !newValue.equals(oldValue), tableMetaData.isPrimaryKey(j)));
record.addColumn(new Column(tableMetaData.getColumnMetaData(j).getName(), newValue, !Objects.equals(newValue, oldValue), tableMetaData.isPrimaryKey(j)));
}
pushRecord(channel, record);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册