diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index 854205588f6db9c8567137690dd8ca618f7db492..0637cda4a4fea2311ab57e433fb51415f3f03aa2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -186,7 +186,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector { return; } - if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { transfer( ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern()); @@ -226,7 +226,7 @@ public class IoTDBAirGapConnector extends IoTDBConnector { return; } - if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java index d983da389bda5cde551439f9e983dfcbeaca9fe8..cb2c68ab6806ce2e96f95c7a8a6ed809af90c884 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java @@ -150,7 +150,7 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { return; } - if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { transfer( ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern()); @@ -291,7 +291,7 @@ public class IoTDBThriftAsyncConnector extends IoTDBConnector { return; } - if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java index f75bc7c612a163fec759ff5a7169fdca847f17ae..dfecd2170b354a11789139b14a8ff43d5dcaf3cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncConnector.java @@ -185,7 +185,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { return; } - if (((EnrichedEvent) tabletInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tabletInsertionEvent).shouldParsePatternOrTime()) { if (tabletInsertionEvent instanceof PipeInsertNodeTabletInsertionEvent) { transfer( ((PipeInsertNodeTabletInsertionEvent) tabletInsertionEvent).parseEventWithPattern()); @@ -231,7 +231,7 @@ public class IoTDBThriftSyncConnector extends IoTDBConnector { return; } - if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePattern()) { + if (((EnrichedEvent) tsFileInsertionEvent).shouldParsePatternOrTime()) { for (final TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) { transfer(event); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java index 5c2651e584735fb37254fc461baabf2ce2a27e2b..269d21f6a5d733c9eed78bcd8b6fb31cfeb1cc2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/EnrichedEvent.java @@ -39,13 +39,14 @@ public abstract class EnrichedEvent implements Event { protected final PipeTaskMeta pipeTaskMeta; private final String pattern; - private final boolean isPatternParsed; + protected boolean isPatternAndTimeParsed; protected EnrichedEvent(PipeTaskMeta pipeTaskMeta, String pattern) { referenceCount = new AtomicInteger(0); this.pipeTaskMeta = pipeTaskMeta; this.pattern = pattern; - isPatternParsed = getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE); + isPatternAndTimeParsed = + getPattern().equals(PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE); } /** @@ -130,8 +131,8 @@ public abstract class EnrichedEvent implements Event { return pattern == null ? PipeExtractorConstant.EXTRACTOR_PATTERN_DEFAULT_VALUE : pattern; } - public boolean shouldParsePattern() { - return !isPatternParsed; + public boolean shouldParsePatternOrTime() { + return !isPatternAndTimeParsed; } public abstract EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 5f30d3f934ec6f1274c8aa4f4380642a083cd781..bd36d7eba5660e753eeb97e81b3ae501efb8b8ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -140,7 +140,7 @@ public class PipeRawTabletInsertionEvent extends EnrichedEvent implements Tablet } public Tablet convertToTablet() { - if (!shouldParsePattern()) { + if (!shouldParsePatternOrTime()) { return tablet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index d1d1adf1d4eb05799ef739222389e6efbf0f2af7..4c7d7f6fb0e56542db75e320cd096e1acfc60d65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -69,6 +69,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent implements TsFileIns this.startTime = startTime; this.endTime = endTime; + if (hasTimeFilter()) { + this.isPatternAndTimeParsed = false; + } this.resource = resource; tsFile = resource.getTsFile();