未验证 提交 eb43b490 编写于 作者: C Caideyipi 提交者: GitHub

Pipe: Report queue size in PipeHeartbeatEvent (#10997)

上级 00cb64c2
......@@ -23,11 +23,17 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
import com.lmax.disruptor.RingBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Deque;
public class PipeHeartbeatEvent extends EnrichedEvent {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeHeartbeatEvent.class);
......@@ -40,6 +46,11 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private long timeProcessed;
private long timeTransferred;
private int disruptorSize;
private int extractorQueueSize;
private int bufferQueueSize;
private int connectorQueueSize;
private final boolean shouldPrintMessage;
public PipeHeartbeatEvent(String dataRegionId, boolean shouldPrintMessage) {
......@@ -86,40 +97,90 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
return false;
}
/////////////////////////////// Whether to print ///////////////////////////////
public boolean isShouldPrintMessage() {
return shouldPrintMessage;
}
/////////////////////////////// Delay Reporting ///////////////////////////////
public void bindPipeName(String pipeName) {
this.pipeName = pipeName;
if (shouldPrintMessage) {
this.pipeName = pipeName;
}
}
public void onPublished() {
timePublished = System.currentTimeMillis();
if (shouldPrintMessage) {
timePublished = System.currentTimeMillis();
}
}
public void onAssigned() {
timeAssigned = System.currentTimeMillis();
if (shouldPrintMessage) {
timeAssigned = System.currentTimeMillis();
}
}
public void onProcessed() {
timeProcessed = System.currentTimeMillis();
if (shouldPrintMessage) {
timeProcessed = System.currentTimeMillis();
}
}
public void onTransferred() {
timeTransferred = System.currentTimeMillis();
if (shouldPrintMessage) {
timeTransferred = System.currentTimeMillis();
}
}
/////////////////////////////// Queue size Reporting ///////////////////////////////
public void recordDisruptorSize(RingBuffer<?> ringBuffer) {
if (shouldPrintMessage) {
disruptorSize = ringBuffer.getBufferSize() - (int) ringBuffer.remainingCapacity();
}
}
public void recordExtractorQueueSize(UnboundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
extractorQueueSize = pendingQueue.size();
}
}
public void recordBufferQueueSize(Deque<Event> bufferQueue) {
if (shouldPrintMessage) {
bufferQueueSize = bufferQueue.size();
}
}
public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event> pendingQueue) {
if (shouldPrintMessage) {
connectorQueueSize = pendingQueue.size();
}
}
@Override
public String toString() {
final String errorMessage = "error";
final String unknownMessage = "Unknown";
final String publishedToAssignedMessage =
timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" : errorMessage;
timeAssigned != 0 ? (timeAssigned - timePublished) + "ms" : unknownMessage;
final String assignedToProcessedMessage =
timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" : errorMessage;
timeProcessed != 0 ? (timeProcessed - timeAssigned) + "ms" : unknownMessage;
final String processedToTransferredMessage =
timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" : errorMessage;
timeTransferred != 0 ? (timeTransferred - timeProcessed) + "ms" : unknownMessage;
final String totalTimeMessage =
timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : errorMessage;
timeTransferred != 0 ? (timeTransferred - timePublished) + "ms" : unknownMessage;
final String disruptorSizeMessage = Integer.toString(disruptorSize);
final String extractorQueueSizeMessage =
timeAssigned != 0 ? Integer.toString(extractorQueueSize) : unknownMessage;
final String bufferQueueSizeMessage =
timeProcessed != 0 ? Integer.toString(bufferQueueSize) : unknownMessage;
final String connectorQueueSizeMessage =
timeProcessed != 0 ? Integer.toString(connectorQueueSize) : unknownMessage;
return "PipeHeartbeatEvent{"
+ "pipeName='"
......@@ -136,6 +197,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
+ processedToTransferredMessage
+ ", totalTimeCost="
+ totalTimeMessage
+ ", disruptorSize="
+ disruptorSizeMessage
+ ", extractorQueueSize="
+ extractorQueueSizeMessage
+ ", bufferQueueSize="
+ bufferQueueSizeMessage
+ ", connectorQueueSize="
+ connectorQueueSizeMessage
+ "}";
}
}
......@@ -155,11 +155,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
// Record the pending queue size before trying to put heartbeatEvent into queue
((PipeHeartbeatEvent) event.getEvent()).recordExtractorQueueSize(pendingQueue);
Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we should not extract any more
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent
&& (((PipeHeartbeatEvent) ((PipeRealtimeEvent) lastEvent).getEvent()).isShouldPrintMessage()
|| !((PipeHeartbeatEvent) event.getEvent()).isShouldPrintMessage())) {
// If the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
// Besides, the printable event has higher priority to stay in queue to enable metrics report.
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridExtractor.class.getName());
return;
}
......
......@@ -75,11 +75,17 @@ public class PipeRealtimeDataRegionLogExtractor extends PipeRealtimeDataRegionEx
}
private void extractHeartbeat(PipeRealtimeEvent event) {
// Record the pending queue size before trying to put heartbeatEvent into queue
((PipeHeartbeatEvent) event.getEvent()).recordExtractorQueueSize(pendingQueue);
Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we should not extract any more
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent
&& (((PipeHeartbeatEvent) ((PipeRealtimeEvent) lastEvent).getEvent()).isShouldPrintMessage()
|| !((PipeHeartbeatEvent) event.getEvent()).isShouldPrintMessage())) {
// If the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
// Besides, the printable event has higher priority to stay in queue to enable metrics report.
event.decreaseReferenceCount(PipeRealtimeDataRegionLogExtractor.class.getName());
return;
}
......
......@@ -75,11 +75,17 @@ public class PipeRealtimeDataRegionTsFileExtractor extends PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
// Record the pending queue size before trying to put heartbeatEvent into queue
((PipeHeartbeatEvent) event.getEvent()).recordExtractorQueueSize(pendingQueue);
Event lastEvent = pendingQueue.peekLast();
if (lastEvent instanceof PipeRealtimeEvent
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent) {
// if the last event in the pending queue is a heartbeat event, we should not extract any more
&& ((PipeRealtimeEvent) lastEvent).getEvent() instanceof PipeHeartbeatEvent
&& (((PipeHeartbeatEvent) ((PipeRealtimeEvent) lastEvent).getEvent()).isShouldPrintMessage()
|| !((PipeHeartbeatEvent) event.getEvent()).isShouldPrintMessage())) {
// If the last event in the pending queue is a heartbeat event, we should not extract any more
// heartbeat events to avoid OOM when the pipe is stopped.
// Besides, the printable event has higher priority to stay in queue to enable metrics report.
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileExtractor.class.getName());
return;
}
......
......@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.realtime.assigner;
import org.apache.iotdb.commons.concurrent.IoTDBDaemonThreadFactory;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import com.lmax.disruptor.BlockingWaitStrategy;
......@@ -56,6 +58,10 @@ public class DisruptorQueue {
}
public void publish(PipeRealtimeEvent event) {
EnrichedEvent internalEvent = event.getEvent();
if (internalEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) internalEvent).recordDisruptorSize(ringBuffer);
}
ringBuffer.publishEvent((container, sequence, o) -> container.setEvent(event), event);
}
......
......@@ -43,6 +43,10 @@ public class PipeEventCollector implements EventCollector {
if (event instanceof EnrichedEvent) {
((EnrichedEvent) event).increaseReferenceCount(PipeEventCollector.class.getName());
}
if (event instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}
while (!bufferQueue.isEmpty()) {
final Event bufferedEvent = bufferQueue.peek();
......
......@@ -36,4 +36,8 @@ public class UnboundedBlockingPendingQueue<E extends Event> extends BlockingPend
public E peekLast() {
return pendingDeque.peekLast();
}
public E removeLast() {
return pendingDeque.removeLast();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册