diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java index edbc14e26d4b5f6cbce483d1be9fc51a49519f15..cbd6d77f189a58871860e15a650bfaab28d94f9c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java @@ -155,7 +155,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio } private void extractHeartbeat(PipeRealtimeEvent event) { - if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) { + Event lastEvent = pendingQueue.peekLast(); + if (lastEvent instanceof PipeRealtimeEvent + && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) { // if the last event in the pending queue is a heartbeat event, we should not extract any more // heartbeat events to avoid OOM when the pipe is stopped. event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java index fb3783dc224f8ae468b67b863815ed23fb23a310..52970f2957d164f10c7049e552ce74a59ea3b299 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionLogExtractor.java @@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx } private void extractHeartbeat(PipeRealtimeEvent event) { - if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) { + Event lastEvent = pendingQueue.peekLast(); + if (lastEvent instanceof PipeRealtimeEvent + && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) { // if the last event in the pending queue is a heartbeat event, we should not extract any more // heartbeat events to avoid OOM when the pipe is stopped. event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java index 5967b1bc87bd5d59318d24ec032468fe676b51ce..816e0249b473f959096b0535e431de90b7304884 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionTsFileExtractor.java @@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio } private void extractHeartbeat(PipeRealtimeEvent event) { - if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) { + Event lastEvent = pendingQueue.peekLast(); + if (lastEvent instanceof PipeRealtimeEvent + && ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) { // if the last event in the pending queue is a heartbeat event, we should not extract any more // heartbeat events to avoid OOM when the pipe is stopped. event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());