提交 9c68f02c 编写于 作者: P Piotr Nowojski 提交者: Piotr Nowojski

[FLINK-20717][metrics] Provide new backPressuredTimeMsPerSecond metric

上级 8d4be699
...@@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but disabled by default, you can fi ...@@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
</tr> </tr>
<tr> <tr>
<td>idleTimeMsPerSecond</td> <td>idleTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is idle (either has no data to process or it is back pressured) per second.</td> <td>The time (in milliseconds) this task is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle.</td>
<td>Meter</td>
</tr>
<tr>
<td>backPressuredTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is back pressured per second.</td>
<td>Meter</td> <td>Meter</td>
</tr> </tr>
<tr> <tr>
......
...@@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but disabled by default, you can fi ...@@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but disabled by default, you can fi
</tr> </tr>
<tr> <tr>
<td>idleTimeMsPerSecond</td> <td>idleTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is idle (either has no data to process or it is back pressured) per second.</td> <td>The time (in milliseconds) this task is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle.</td>
<td>Meter</td>
</tr>
<tr>
<td>backPressuredTimeMsPerSecond</td>
<td>The time (in milliseconds) this task is back pressured per second.</td>
<td>Meter</td> <td>Meter</td>
</tr> </tr>
<tr> <tr>
......
...@@ -64,7 +64,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { ...@@ -64,7 +64,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
/** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */ /** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */
private BufferBuilder broadcastBufferBuilder; private BufferBuilder broadcastBufferBuilder;
private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter()); private Meter backPressuredTimeMsPerSecond = new MeterView(new SimpleCounter());
public BufferWritingResultPartition( public BufferWritingResultPartition(
String owningTaskName, String owningTaskName,
...@@ -193,7 +193,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { ...@@ -193,7 +193,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
@Override @Override
public void setMetricGroup(TaskIOMetricGroup metrics) { public void setMetricGroup(TaskIOMetricGroup metrics) {
super.setMetricGroup(metrics); super.setMetricGroup(metrics);
idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond(); backPressuredTimeMsPerSecond = metrics.getBackPressuredTimePerSecond();
} }
@Override @Override
...@@ -335,10 +335,11 @@ public abstract class BufferWritingResultPartition extends ResultPartition { ...@@ -335,10 +335,11 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
return bufferBuilder; return bufferBuilder;
} }
final long start = System.currentTimeMillis();
try { try {
long start = System.currentTimeMillis();
bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition); bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start); long backPressuredTime = System.currentTimeMillis() - start;
backPressuredTimeMsPerSecond.markEvent(backPressuredTime);
return bufferBuilder; return bufferBuilder;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for buffer"); throw new IOException("Interrupted while waiting for buffer");
...@@ -377,8 +378,8 @@ public abstract class BufferWritingResultPartition extends ResultPartition { ...@@ -377,8 +378,8 @@ public abstract class BufferWritingResultPartition extends ResultPartition {
} }
@VisibleForTesting @VisibleForTesting
public Meter getIdleTimeMsPerSecond() { public Meter getBackPressuredTimeMsPerSecond() {
return idleTimeMsPerSecond; return backPressuredTimeMsPerSecond;
} }
@VisibleForTesting @VisibleForTesting
......
...@@ -67,4 +67,5 @@ public class MetricNames { ...@@ -67,4 +67,5 @@ public class MetricNames {
} }
public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE; public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE;
public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE;
} }
...@@ -46,6 +46,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { ...@@ -46,6 +46,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Meter numRecordsOutRate; private final Meter numRecordsOutRate;
private final Meter numBuffersOutRate; private final Meter numBuffersOutRate;
private final Meter idleTimePerSecond; private final Meter idleTimePerSecond;
private final Meter backPressuredTimePerSecond;
public TaskIOMetricGroup(TaskMetricGroup parent) { public TaskIOMetricGroup(TaskMetricGroup parent) {
super(parent); super(parent);
...@@ -68,6 +69,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { ...@@ -68,6 +69,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
this.idleTimePerSecond = this.idleTimePerSecond =
meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter())); meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter()));
this.backPressuredTimePerSecond =
meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter()));
} }
public IOMetrics createSnapshot() { public IOMetrics createSnapshot() {
...@@ -102,6 +105,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> { ...@@ -102,6 +105,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
return idleTimePerSecond; return idleTimePerSecond;
} }
public Meter getBackPressuredTimePerSecond() {
return backPressuredTimePerSecond;
}
// ============================================================================================ // ============================================================================================
// Metric Reuse // Metric Reuse
// ============================================================================================ // ============================================================================================
......
...@@ -493,7 +493,7 @@ public class ResultPartitionTest { ...@@ -493,7 +493,7 @@ public class ResultPartitionTest {
} }
@Test @Test
public void testIdleTime() throws IOException, InterruptedException { public void testIdleAndBackPressuredTime() throws IOException, InterruptedException {
// setup // setup
int bufferSize = 1024; int bufferSize = 1024;
NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize); NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
...@@ -509,8 +509,8 @@ public class ResultPartitionTest { ...@@ -509,8 +509,8 @@ public class ResultPartitionTest {
Buffer buffer = readView.getNextBuffer().buffer(); Buffer buffer = readView.getNextBuffer().buffer();
assertNotNull(buffer); assertNotNull(buffer);
// idle time is zero when there is buffer available. // back-pressured time is zero when there is buffer available.
assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount()); assertEquals(0, resultPartition.getBackPressuredTimeMsPerSecond().getCount());
CountDownLatch syncLock = new CountDownLatch(1); CountDownLatch syncLock = new CountDownLatch(1);
final Thread requestThread = final Thread requestThread =
...@@ -536,7 +536,8 @@ public class ResultPartitionTest { ...@@ -536,7 +536,8 @@ public class ResultPartitionTest {
requestThread.join(); requestThread.join();
Assert.assertThat( Assert.assertThat(
resultPartition.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L)); resultPartition.getBackPressuredTimeMsPerSecond().getCount(),
Matchers.greaterThan(0L));
assertNotNull(readView.getNextBuffer().buffer()); assertNotNull(readView.getNextBuffer().buffer());
} }
......
...@@ -23,6 +23,7 @@ import org.apache.flink.configuration.TaskManagerOptions; ...@@ -23,6 +23,7 @@ import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus; import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
...@@ -47,6 +48,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; ...@@ -47,6 +48,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
...@@ -72,6 +74,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; ...@@ -72,6 +74,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
...@@ -304,7 +307,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab ...@@ -304,7 +307,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
this.recordWriter = createRecordWriterDelegate(configuration, environment); this.recordWriter = createRecordWriterDelegate(configuration, environment);
this.actionExecutor = Preconditions.checkNotNull(actionExecutor); this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor); this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
this.mailboxProcessor.initMetric(environment.getMetricGroup());
this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor();
this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment); this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
this.asyncOperationsThreadPool = this.asyncOperationsThreadPool =
...@@ -400,25 +402,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab ...@@ -400,25 +402,20 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
controller.allActionsCompleted(); controller.allActionsCompleted();
return; return;
} }
CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume));
}
/** TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup();
* Considers three scenarios to combine input and output futures: 1. Both input and output are final long startTime = System.currentTimeMillis();
* unavailable. 2. Only input is unavailable. 3. Only output is unavailable. Meter timer;
*/ CompletableFuture<?> resumeFuture;
@VisibleForTesting if (!recordWriter.isAvailable()) {
CompletableFuture<?> getInputOutputJointFuture(InputStatus status) { timer = ioMetrics.getBackPressuredTimePerSecond();
if (status == InputStatus.NOTHING_AVAILABLE && !recordWriter.isAvailable()) { resumeFuture = recordWriter.getAvailableFuture();
return CompletableFuture.allOf(
inputProcessor.getAvailableFuture(), recordWriter.getAvailableFuture());
} else if (status == InputStatus.NOTHING_AVAILABLE) {
return inputProcessor.getAvailableFuture();
} else { } else {
return recordWriter.getAvailableFuture(); timer = ioMetrics.getIdleTimeMsPerSecond();
resumeFuture = inputProcessor.getAvailableFuture();
} }
assertNoException(
resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(), timer, startTime)));
} }
private void resetSynchronousSavepointId() { private void resetSynchronousSavepointId() {
...@@ -1321,4 +1318,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab ...@@ -1321,4 +1318,22 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends Ab
protected long getAsyncCheckpointStartDelayNanos() { protected long getAsyncCheckpointStartDelayNanos() {
return latestAsyncCheckpointStartDelayNanos; return latestAsyncCheckpointStartDelayNanos;
} }
private static class ResumeWrapper implements Runnable {
private final Suspension suspendedDefaultAction;
private final Meter timer;
private final long startTime;
public ResumeWrapper(Suspension suspendedDefaultAction, Meter timer, long startTime) {
this.suspendedDefaultAction = suspendedDefaultAction;
this.timer = timer;
this.startTime = startTime;
}
@Override
public void run() {
timer.markEvent(System.currentTimeMillis() - startTime);
suspendedDefaultAction.resume();
}
}
} }
...@@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; ...@@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.tasks.mailbox;
import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException;
...@@ -91,8 +87,6 @@ public class MailboxProcessor implements Closeable { ...@@ -91,8 +87,6 @@ public class MailboxProcessor implements Closeable {
private final StreamTaskActionExecutor actionExecutor; private final StreamTaskActionExecutor actionExecutor;
private Meter idleTime = new MeterView(new SimpleCounter());
@VisibleForTesting @VisibleForTesting
public MailboxProcessor() { public MailboxProcessor() {
this(MailboxDefaultAction.Controller::suspendDefaultAction); this(MailboxDefaultAction.Controller::suspendDefaultAction);
...@@ -131,10 +125,6 @@ public class MailboxProcessor implements Closeable { ...@@ -131,10 +125,6 @@ public class MailboxProcessor implements Closeable {
return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this); return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
} }
public void initMetric(TaskMetricGroup metricGroup) {
idleTime = metricGroup.getIOMetricGroup().getIdleTimeMsPerSecond();
}
/** Lifecycle method to close the mailbox for action submission. */ /** Lifecycle method to close the mailbox for action submission. */
public void prepareClose() { public void prepareClose() {
mailbox.quiesce(); mailbox.quiesce();
...@@ -315,9 +305,7 @@ public class MailboxProcessor implements Closeable { ...@@ -315,9 +305,7 @@ public class MailboxProcessor implements Closeable {
while (isDefaultActionUnavailable() && isMailboxLoopRunning()) { while (isDefaultActionUnavailable() && isMailboxLoopRunning()) {
maybeMail = mailbox.tryTake(MIN_PRIORITY); maybeMail = mailbox.tryTake(MIN_PRIORITY);
if (!maybeMail.isPresent()) { if (!maybeMail.isPresent()) {
long start = System.currentTimeMillis();
maybeMail = Optional.of(mailbox.take(MIN_PRIORITY)); maybeMail = Optional.of(mailbox.take(MIN_PRIORITY));
idleTime.markEvent(System.currentTimeMillis() - start);
} }
maybeMail.get().run(); maybeMail.get().run();
processed = true; processed = true;
...@@ -354,11 +342,6 @@ public class MailboxProcessor implements Closeable { ...@@ -354,11 +342,6 @@ public class MailboxProcessor implements Closeable {
return mailboxLoopRunning; return mailboxLoopRunning;
} }
@VisibleForTesting
public Meter getIdleTime() {
return idleTime;
}
@VisibleForTesting @VisibleForTesting
public boolean hasMail() { public boolean hasMail() {
return mailbox.hasMail(); return mailbox.hasMail();
......
...@@ -51,6 +51,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriter; ...@@ -51,6 +51,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment;
...@@ -116,6 +117,7 @@ import org.apache.flink.util.TestLogger; ...@@ -116,6 +117,7 @@ import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.SupplierWithException; import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
...@@ -1241,6 +1243,7 @@ public class StreamTaskTest extends TestLogger { ...@@ -1241,6 +1243,7 @@ public class StreamTaskTest extends TestLogger {
@Test @Test
public void testProcessWithUnAvailableOutput() throws Exception { public void testProcessWithUnAvailableOutput() throws Exception {
final long sleepTime = 42;
try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, false})) { try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, false})) {
final int numberOfProcessCalls = 10; final int numberOfProcessCalls = 10;
final AvailabilityTestInputProcessor inputProcessor = final AvailabilityTestInputProcessor inputProcessor =
...@@ -1253,6 +1256,7 @@ public class StreamTaskTest extends TestLogger { ...@@ -1253,6 +1256,7 @@ public class StreamTaskTest extends TestLogger {
final RunnableWithException completeFutureTask = final RunnableWithException completeFutureTask =
() -> { () -> {
Thread.sleep(sleepTime + 1);
assertEquals(1, inputProcessor.currentNumProcessCalls); assertEquals(1, inputProcessor.currentNumProcessCalls);
assertTrue(task.mailboxProcessor.isDefaultActionUnavailable()); assertTrue(task.mailboxProcessor.isDefaultActionUnavailable());
environment.getWriter(1).getAvailableFuture().complete(null); environment.getWriter(1).getAvailableFuture().complete(null);
...@@ -1266,11 +1270,39 @@ public class StreamTaskTest extends TestLogger { ...@@ -1266,11 +1270,39 @@ public class StreamTaskTest extends TestLogger {
}, },
"This task will submit another task to execute after processing input once."); "This task will submit another task to execute after processing input once.");
TaskIOMetricGroup ioMetricGroup =
task.getEnvironment().getMetricGroup().getIOMetricGroup();
task.invoke(); task.invoke();
assertThat(
ioMetricGroup.getBackPressuredTimePerSecond().getCount(),
Matchers.greaterThanOrEqualTo(sleepTime));
assertThat(ioMetricGroup.getIdleTimeMsPerSecond().getCount(), is(0L));
assertEquals(numberOfProcessCalls, inputProcessor.currentNumProcessCalls); assertEquals(numberOfProcessCalls, inputProcessor.currentNumProcessCalls);
} }
} }
@Test
public void testProcessWithUnAvailableInput() throws Exception {
final long unAvailableTime = 42;
try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, true})) {
final UnAvailableTestInputProcessor inputProcessor =
new UnAvailableTestInputProcessor(unAvailableTime);
final StreamTask task =
new MockStreamTaskBuilder(environment)
.setStreamInputProcessor(inputProcessor)
.build();
TaskIOMetricGroup ioMetricGroup =
task.getEnvironment().getMetricGroup().getIOMetricGroup();
task.invoke();
assertThat(
ioMetricGroup.getIdleTimeMsPerSecond().getCount(),
Matchers.greaterThanOrEqualTo(unAvailableTime));
assertThat(ioMetricGroup.getBackPressuredTimePerSecond().getCount(), is(0L));
}
}
private MockEnvironment setupEnvironment(boolean[] outputAvailabilities) { private MockEnvironment setupEnvironment(boolean[] outputAvailabilities) {
final Configuration configuration = new Configuration(); final Configuration configuration = new Configuration();
new MockStreamConfig(configuration, outputAvailabilities.length); new MockStreamConfig(configuration, outputAvailabilities.length);
...@@ -1433,6 +1465,76 @@ public class StreamTaskTest extends TestLogger { ...@@ -1433,6 +1465,76 @@ public class StreamTaskTest extends TestLogger {
} }
} }
/**
* A stream input processor implementation with input unavailable for a specified amount of
* time, after which processor is closing.
*/
private static class UnAvailableTestInputProcessor implements StreamInputProcessor {
private final AvailabilityHelper availabilityProvider = new AvailabilityHelper();
private final Thread timerThread;
private boolean timerTriggered;
private volatile Exception asyncException;
public UnAvailableTestInputProcessor(long unAvailableTime) {
timerThread =
new Thread() {
@Override
public void run() {
try {
Thread.sleep(unAvailableTime);
availabilityProvider
.getUnavailableToResetAvailable()
.complete(null);
} catch (Exception e) {
asyncException = e;
}
}
};
}
@Override
public InputStatus processInput() {
maybeTriggerTimer();
return availabilityProvider.isAvailable()
? InputStatus.END_OF_INPUT
: InputStatus.NOTHING_AVAILABLE;
}
@Override
public CompletableFuture<Void> prepareSnapshot(
ChannelStateWriter channelStateWriter, final long checkpointId) {
return FutureUtils.completedVoidFuture();
}
@Override
public void close() throws IOException {
if (asyncException != null) {
throw new IOException(asyncException);
}
try {
timerThread.join();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public CompletableFuture<?> getAvailableFuture() {
maybeTriggerTimer();
return availabilityProvider.getAvailableFuture();
}
private void maybeTriggerTimer() {
if (timerTriggered) {
return;
}
timerTriggered = true;
timerThread.start();
}
}
private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> { private static class BlockingCloseStreamOperator extends AbstractStreamOperator<Void> {
private static final long serialVersionUID = -9042150529568008847L; private static final long serialVersionUID = -9042150529568008847L;
......
...@@ -23,13 +23,11 @@ import org.apache.flink.streaming.api.operators.MailboxExecutor; ...@@ -23,13 +23,11 @@ import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.util.FlinkException; import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
...@@ -254,71 +252,6 @@ public class TaskMailboxProcessorTest { ...@@ -254,71 +252,6 @@ public class TaskMailboxProcessorTest {
mailboxProcessor.allActionsCompleted(); mailboxProcessor.allActionsCompleted();
} }
@Test
public void testNoIdleTimeWhenBusy() throws InterruptedException {
final AtomicReference<MailboxDefaultAction.Suspension> suspendedActionRef =
new AtomicReference<>();
final int totalSwitches = 10;
AtomicInteger count = new AtomicInteger();
MailboxThread mailboxThread =
new MailboxThread() {
@Override
public void runDefaultAction(Controller controller) {
int currentCount = count.incrementAndGet();
if (currentCount == totalSwitches) {
controller.allActionsCompleted();
}
}
};
mailboxThread.start();
final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
mailboxThread.signalStart();
mailboxThread.join();
Assert.assertEquals(0, mailboxProcessor.getIdleTime().getCount());
Assert.assertEquals(totalSwitches, count.get());
}
@Test
public void testIdleTime() throws InterruptedException {
final AtomicReference<MailboxDefaultAction.Suspension> suspendedActionRef =
new AtomicReference<>();
final int totalSwitches = 2;
CountDownLatch syncLock = new CountDownLatch(1);
MailboxThread mailboxThread =
new MailboxThread() {
int count = 0;
@Override
public void runDefaultAction(Controller controller) {
// If this is violated, it means that the default action was invoked while
// we assumed suspension
Assert.assertTrue(
suspendedActionRef.compareAndSet(
null, controller.suspendDefaultAction()));
++count;
if (count == totalSwitches) {
controller.allActionsCompleted();
}
syncLock.countDown();
}
};
mailboxThread.start();
final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
mailboxThread.signalStart();
syncLock.await();
Thread.sleep(10);
mailboxProcessor
.getMailboxExecutor(DEFAULT_PRIORITY)
.execute(suspendedActionRef.get()::resume, "resume");
mailboxThread.join();
Assert.assertThat(mailboxProcessor.getIdleTime().getCount(), Matchers.greaterThan(0L));
}
private static MailboxProcessor start(MailboxThread mailboxThread) { private static MailboxProcessor start(MailboxThread mailboxThread) {
mailboxThread.start(); mailboxThread.start();
final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor(); final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册