提交 a6e84f83 编写于 作者: A avalon566

Decode unsupported mysql binlog evnet to placeholder event

上级 6f405a30
......@@ -18,13 +18,13 @@
package info.avalon566.shardingscaling;
import com.google.gson.Gson;
import info.avalon566.shardingscaling.core.config.SyncConfiguration;
import info.avalon566.shardingscaling.core.config.ScalingConfiguration;
import info.avalon566.shardingscaling.core.config.ScalingContext;
import info.avalon566.shardingscaling.core.config.RuleConfiguration;
import info.avalon566.shardingscaling.core.config.RdbmsConfiguration;
import info.avalon566.shardingscaling.core.config.DataSourceConfiguration;
import info.avalon566.shardingscaling.core.config.JdbcDataSourceConfiguration;
import info.avalon566.shardingscaling.core.config.RdbmsConfiguration;
import info.avalon566.shardingscaling.core.config.RuleConfiguration;
import info.avalon566.shardingscaling.core.config.ScalingConfiguration;
import info.avalon566.shardingscaling.core.config.ScalingContext;
import info.avalon566.shardingscaling.core.config.SyncConfiguration;
import info.avalon566.shardingscaling.core.config.SyncType;
import info.avalon566.shardingscaling.core.job.MigrateProgress;
import info.avalon566.shardingscaling.core.job.ScalingController;
......@@ -75,7 +75,9 @@ public class Bootstrap {
break;
}
for (MigrateProgress progress : scalingController.getProgresses()) {
log.info(progress.getLogPosition().toString());
if (null != progress.getLogPosition()) {
log.info(progress.getLogPosition().toString());
}
}
}
}
......
......@@ -20,6 +20,7 @@ package info.avalon566.shardingscaling.core.sync.channel;
import info.avalon566.shardingscaling.core.sync.reader.LogPosition;
import info.avalon566.shardingscaling.core.sync.record.DataRecord;
import info.avalon566.shardingscaling.core.sync.record.FinishedRecord;
import info.avalon566.shardingscaling.core.sync.record.PlaceholderRecord;
import info.avalon566.shardingscaling.core.sync.record.Record;
import java.util.Collections;
......@@ -28,6 +29,8 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
/**
......@@ -55,6 +58,8 @@ public class RealtimeSyncChannel implements Channel {
private Map<LogPosition, Record> pendingAcknowledgeRecords = new ConcurrentHashMap<>();
private final Timer timer = new Timer();
public RealtimeSyncChannel(final int channelNumber) {
this(channelNumber, new LinkedList<AckCallback>());
}
......@@ -63,8 +68,13 @@ public class RealtimeSyncChannel implements Channel {
this.channelNumber = channelNumber;
this.ackCallbacks = ackCallbacks;
for (int i = 0; i < channelNumber; i++) {
channels.put(Integer.toString(i), new MemoryChannel(Collections.singletonList((AckCallback) new SingleChannelAckCallback())));
if (0 < ackCallbacks.size()) {
channels.put(Integer.toString(i), new MemoryChannel(Collections.singletonList((AckCallback) new SingleChannelAckCallback())));
} else {
channels.put(Integer.toString(i), new MemoryChannel());
}
}
scheduleAckRecords();
}
@Override
......@@ -82,6 +92,11 @@ public class RealtimeSyncChannel implements Channel {
DataRecord dataRecord = (DataRecord) record;
String index = Integer.toString(dataRecord.getTableName().hashCode() % channelNumber);
channels.get(index).pushRecord(dataRecord);
} else if (PlaceholderRecord.class.equals(record.getClass())) {
if (0 < ackCallbacks.size()) {
toBeAcknowledgeRecords.add(record);
pendingAcknowledgeRecords.put(record.getLogPosition(), record);
}
} else {
throw new RuntimeException("Not Support Record Type");
}
......@@ -115,31 +130,39 @@ public class RealtimeSyncChannel implements Channel {
}
}
private void scheduleAckRecords() {
timer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (RealtimeSyncChannel.this) {
Iterator<Record> iterator = toBeAcknowledgeRecords.iterator();
List<Record> result = new LinkedList<>();
while (iterator.hasNext()) {
Record record = iterator.next();
if (pendingAcknowledgeRecords.containsKey(record.getLogPosition())) {
result.add(record);
iterator.remove();
pendingAcknowledgeRecords.remove(record.getLogPosition());
} else {
break;
}
}
if (0 < ackCallbacks.size() && 0 < result.size()) {
for (AckCallback each : ackCallbacks) {
each.onAck(result);
}
}
}
}
}, 5000, 1000);
}
class SingleChannelAckCallback implements AckCallback {
@Override
public void onAck(final List<Record> records) {
synchronized (RealtimeSyncChannel.this) {
for (Record record : records) {
pendingAcknowledgeRecords.put(record.getLogPosition(), record);
}
Iterator<Record> iterator = toBeAcknowledgeRecords.iterator();
List<Record> result = new LinkedList<>();
while (iterator.hasNext()) {
Record record = iterator.next();
if (pendingAcknowledgeRecords.containsKey(record.getLogPosition())) {
result.add(record);
iterator.remove();
pendingAcknowledgeRecords.remove(record.getLogPosition());
} else {
break;
}
}
if (0 < ackCallbacks.size() && 0 < result.size()) {
for (AckCallback each : ackCallbacks) {
each.onAck(result);
}
}
for (Record record : records) {
pendingAcknowledgeRecords.put(record.getLogPosition(), record);
}
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.core.sync.record;
import info.avalon566.shardingscaling.core.sync.reader.LogPosition;
/**
* Placeholder record.
*
* @author avalon566
*/
public class PlaceholderRecord extends Record {
public PlaceholderRecord(final LogPosition logPosition) {
super(logPosition);
}
}
......@@ -27,9 +27,11 @@ import info.avalon566.shardingscaling.core.sync.record.Column;
import info.avalon566.shardingscaling.core.sync.record.DataRecord;
import info.avalon566.shardingscaling.core.sync.metadata.JdbcUri;
import info.avalon566.shardingscaling.core.sync.record.FinishedRecord;
import info.avalon566.shardingscaling.core.sync.record.PlaceholderRecord;
import info.avalon566.shardingscaling.core.sync.record.Record;
import info.avalon566.shardingscaling.mysql.binlog.event.AbstractBinlogEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.DeleteRowsEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.PlaceholderEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.UpdateRowsEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.WriteRowsEvent;
import info.avalon566.shardingscaling.core.sync.util.DataSourceFactory;
......@@ -117,6 +119,8 @@ public final class MySQLBinlogReader extends AbstractSyncRunner implements LogRe
handleUpdateRowsEvent(channel, uri, (UpdateRowsEvent) event);
} else if (event instanceof DeleteRowsEvent) {
handleDeleteRowsEvent(channel, uri, (DeleteRowsEvent) event);
} else if (event instanceof PlaceholderEvent) {
handlePlaceholderEvent(channel, (PlaceholderEvent) event);
}
}
pushRecord(channel, new FinishedRecord(new NopLogPosition()));
......@@ -174,6 +178,11 @@ public final class MySQLBinlogReader extends AbstractSyncRunner implements LogRe
}
}
private void handlePlaceholderEvent(final Channel channel, final PlaceholderEvent event) {
PlaceholderRecord record = new PlaceholderRecord(new BinlogPosition("1", event.getFileName(), event.getPosition()));
pushRecord(channel, record);
}
private void pushRecord(final Channel channel, final Record record) {
try {
channel.pushRecord(record);
......
......@@ -19,6 +19,7 @@ package info.avalon566.shardingscaling.mysql.binlog.codec;
import info.avalon566.shardingscaling.mysql.binlog.BinlogContext;
import info.avalon566.shardingscaling.mysql.binlog.event.DeleteRowsEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.PlaceholderEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.UpdateRowsEvent;
import info.avalon566.shardingscaling.mysql.binlog.event.WriteRowsEvent;
import info.avalon566.shardingscaling.mysql.binlog.packet.binlog.EventTypes;
......@@ -78,6 +79,7 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
out.add(decodeDeleteRowsEventV2(binlogEventHeader, in));
break;
default:
out.add(createPlaceholderEvent(binlogEventHeader));
DataTypesCodec.skipBytes(in.readableBytes(), in);
}
if (in.isReadable()) {
......@@ -149,6 +151,13 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder {
return result;
}
private PlaceholderEvent createPlaceholderEvent(final BinlogEventHeader binlogEventHeader) {
PlaceholderEvent result = new PlaceholderEvent();
result.setFileName(binlogContext.getFileName());
result.setPosition(binlogEventHeader.getEndLogPos());
return result;
}
private void decodeTableMapEvent(final ByteBuf in) {
TableMapEvent tableMapLogEvent = new TableMapEvent();
tableMapLogEvent.parsePostHeader(in);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package info.avalon566.shardingscaling.mysql.binlog.event;
/**
* Placeholder binlog event, unsupported binlog event will replace it into this class.
*
* @author avalon566
*/
public class PlaceholderEvent extends AbstractBinlogEvent {
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册