diff --git a/deployer/src/main/resources/canal.properties b/deployer/src/main/resources/canal.properties index 46c36bf2f13c0f76b1096c37aa6ba5c605809a32..b1d65b83d50d7a1f62ff4caf44c420a92072309e 100644 --- a/deployer/src/main/resources/canal.properties +++ b/deployer/src/main/resources/canal.properties @@ -60,6 +60,9 @@ canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false +canal.instance.filter.dml.insert=false +canal.instance.filter.dml.update=false +canal.instance.filter.dml.delete=false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED diff --git a/deployer/src/main/resources/spring/default-instance.xml b/deployer/src/main/resources/spring/default-instance.xml index ae264eafc0492b8b6928ac91c686b4e35cca00c3..add43b743a7fb0f5f58d3c2dd02004adebebc62d 100644 --- a/deployer/src/main/resources/spring/default-instance.xml +++ b/deployer/src/main/resources/spring/default-instance.xml @@ -174,6 +174,9 @@ + + + diff --git a/deployer/src/main/resources/spring/file-instance.xml b/deployer/src/main/resources/spring/file-instance.xml index 21b943cde627108ad1c3116dcb92baf2d61a265c..65fab920e275ea0d7b5ca7e4f60bf5e8a18a2056 100644 --- a/deployer/src/main/resources/spring/file-instance.xml +++ b/deployer/src/main/resources/spring/file-instance.xml @@ -160,6 +160,9 @@ + + + diff --git a/deployer/src/main/resources/spring/group-instance.xml b/deployer/src/main/resources/spring/group-instance.xml index 1f0946debff31aea7ecefe8c017a844a17f4505d..bd808b2478ccae8fae7d72f15e7fbf73585feffc 100644 --- a/deployer/src/main/resources/spring/group-instance.xml +++ b/deployer/src/main/resources/spring/group-instance.xml @@ -157,6 +157,9 @@ + + + @@ -261,6 +264,9 @@ + + + diff --git a/deployer/src/main/resources/spring/memory-instance.xml b/deployer/src/main/resources/spring/memory-instance.xml index 451ab9951f2c677ea284e5ae41631328f9a25b2a..5ec293e0bebd333a276147e8fd1e54d753977f70 100644 --- a/deployer/src/main/resources/spring/memory-instance.xml +++ b/deployer/src/main/resources/spring/memory-instance.xml @@ -148,6 +148,9 @@ + + + diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java index 7ae01f6327eeb89c63221478e477799d6688dbb9..a8768131c29509102bf2b043bc9596b75c48e1bc 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java @@ -16,6 +16,7 @@ import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.DefaultTableMetaTSDBFact import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDB; import com.alibaba.otter.canal.parse.inbound.mysql.tsdb.TableMetaTSDBFactory; import com.alibaba.otter.canal.protocol.position.EntryPosition; +import org.apache.commons.lang.StringUtils; public abstract class AbstractMysqlEventParser extends AbstractEventParser { @@ -37,6 +38,10 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser { protected boolean filterRows = false; protected boolean filterTableError = false; protected boolean useDruidDdlFilter = true; + + protected boolean filterDmlInsert = false; + protected boolean filterDmlUpdate = false; + protected boolean filterDmlDelete = false; // instance received binlog bytes protected final AtomicLong receivedBinlogBytes = new AtomicLong(0L); private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L); @@ -173,7 +178,7 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser { parallelThreadSize, (LogEventConvert) binlogParser, transactionBuffer, - destination); + destination, filterDmlInsert, filterDmlUpdate, filterDmlDelete); mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime); return mysqlMultiStageCoprocessor; } @@ -224,6 +229,30 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser { this.useDruidDdlFilter = useDruidDdlFilter; } + public boolean isFilterDmlInsert() { + return filterDmlInsert; + } + + public void setFilterDmlInsert(boolean filterDmlInsert) { + this.filterDmlInsert = filterDmlInsert; + } + + public boolean isFilterDmlUpdate() { + return filterDmlUpdate; + } + + public void setFilterDmlUpdate(boolean filterDmlUpdate) { + this.filterDmlUpdate = filterDmlUpdate; + } + + public boolean isFilterDmlDelete() { + return filterDmlDelete; + } + + public void setFilterDmlDelete(boolean filterDmlDelete) { + this.filterDmlDelete = filterDmlDelete; + } + public void setEnableTsdb(boolean enableTsdb) { this.enableTsdb = enableTsdb; if (this.enableTsdb) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java index 749cfa61ce5cb8366399648fce86aa81f82a8bd7..00b703930ce1f95b0ce0d21eb224a1f3e2b85dca 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java @@ -72,13 +72,20 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement private BatchEventProcessor sinkStoreStage; private LogContext logContext; + protected boolean filterDmlInsert = false; + protected boolean filterDmlUpdate = false; + protected boolean filterDmlDelete = false; + public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert, - EventTransactionBuffer transactionBuffer, String destination){ + EventTransactionBuffer transactionBuffer, String destination, boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){ this.ringBufferSize = ringBufferSize; this.parserThreadCount = parserThreadCount; this.logEventConvert = logEventConvert; this.transactionBuffer = transactionBuffer; this.destination = destination; + this.filterDmlInsert = filterDmlInsert; + this.filterDmlUpdate = filterDmlUpdate; + this.filterDmlDelete = filterDmlDelete; } @Override @@ -272,18 +279,18 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement case LogEvent.WRITE_ROWS_EVENT_V1: case LogEvent.WRITE_ROWS_EVENT: tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent); - needDmlParse = true; + needDmlParse = !filterDmlInsert;//true; break; case LogEvent.UPDATE_ROWS_EVENT_V1: case LogEvent.PARTIAL_UPDATE_ROWS_EVENT: case LogEvent.UPDATE_ROWS_EVENT: tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent); - needDmlParse = true; + needDmlParse = !filterDmlUpdate;//true; break; case LogEvent.DELETE_ROWS_EVENT_V1: case LogEvent.DELETE_ROWS_EVENT: tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent); - needDmlParse = true; + needDmlParse = !filterDmlDelete;//true; break; case LogEvent.ROWS_QUERY_LOG_EVENT: needDmlParse = true;