未验证 提交 35ffb512 编写于 作者: 马子坤 提交者: GitHub

[IOTDB-6135] Pipe: Fix a bug which keeps generating PipeHeartbeatEvent unnecessarily (#11008)

上级 cd57fefb
...@@ -155,7 +155,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio ...@@ -155,7 +155,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
} }
private void extractHeartbeat(PipeRealtimeEvent event) { private void extractHeartbeat(PipeRealtimeEvent event) {
if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) { Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we should not extract any more // if the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped. // heartbeat events to avoid OOM when the pipe is stopped.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName()); event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
......
...@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx ...@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
} }
private void extractHeartbeat(PipeRealtimeEvent event) { private void extractHeartbeat(PipeRealtimeEvent event) {
if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) { Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we should not extract any more // if the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped. // heartbeat events to avoid OOM when the pipe is stopped.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName()); event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
......
...@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio ...@@ -75,7 +75,9 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
} }
private void extractHeartbeat(PipeRealtimeEvent event) { private void extractHeartbeat(PipeRealtimeEvent event) {
if (pendingQueue.peekLast() instanceof PipeHeartbeatEvent) { Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we should not extract any more // if the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped. // heartbeat events to avoid OOM when the pipe is stopped.
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName()); event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册