diff --git a/docs/ops/metrics.md b/docs/ops/metrics.md index 9b7a6c34630865d571ed7a4c4ef7d948b15125ce..e2e937055d980dac1461c7deead4ecea9a47f14d 100644 --- a/docs/ops/metrics.md +++ b/docs/ops/metrics.md @@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but disabled by default, you can fi idleTimeMsPerSecond - The time (in milliseconds) this task is idle (either has no data to process or it is back pressured) per second. + The time (in milliseconds) this task is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle. + Meter + + + backPressuredTimeMsPerSecond + The time (in milliseconds) this task is back pressured per second. Meter diff --git a/docs/ops/metrics.zh.md b/docs/ops/metrics.zh.md index eac4b556618f2cb847aa4d3b30fbdf278a637c45..19fcba36c60371de88a6c085d1bc495ba2974519 100644 --- a/docs/ops/metrics.zh.md +++ b/docs/ops/metrics.zh.md @@ -1250,7 +1250,12 @@ Certain RocksDB native metrics are available but disabled by default, you can fi idleTimeMsPerSecond - The time (in milliseconds) this task is idle (either has no data to process or it is back pressured) per second. + The time (in milliseconds) this task is idle (has no data to process) per second. Idle time excludes back pressured time, so if the task is back pressured it is not idle. + Meter + + + backPressuredTimeMsPerSecond + The time (in milliseconds) this task is back pressured per second. Meter diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java index 5a36831d3c3f21bbcd2ee17321b2bb106063f985..c5e11380b9553d75f10538bf01d7d51272fc537f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java @@ -64,7 +64,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { /** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */ private BufferBuilder broadcastBufferBuilder; - private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter()); + private Meter backPressuredTimeMsPerSecond = new MeterView(new SimpleCounter()); public BufferWritingResultPartition( String owningTaskName, @@ -193,7 +193,7 @@ public abstract class BufferWritingResultPartition extends ResultPartition { @Override public void setMetricGroup(TaskIOMetricGroup metrics) { super.setMetricGroup(metrics); - idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond(); + backPressuredTimeMsPerSecond = metrics.getBackPressuredTimePerSecond(); } @Override @@ -335,10 +335,11 @@ public abstract class BufferWritingResultPartition extends ResultPartition { return bufferBuilder; } - final long start = System.currentTimeMillis(); try { + long start = System.currentTimeMillis(); bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition); - idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start); + long backPressuredTime = System.currentTimeMillis() - start; + backPressuredTimeMsPerSecond.markEvent(backPressuredTime); return bufferBuilder; } catch (InterruptedException e) { throw new IOException("Interrupted while waiting for buffer"); @@ -377,8 +378,8 @@ public abstract class BufferWritingResultPartition extends ResultPartition { } @VisibleForTesting - public Meter getIdleTimeMsPerSecond() { - return idleTimeMsPerSecond; + public Meter getBackPressuredTimeMsPerSecond() { + return backPressuredTimeMsPerSecond; } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index ccb442b04d9cc14f8dec8e979b6ede080ef35139..6a58de7742d7668413c0adf8be2e0e363f210f69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -67,4 +67,5 @@ public class MetricNames { } public static final String TASK_IDLE_TIME = "idleTimeMs" + SUFFIX_RATE; + public static final String TASK_BACK_PRESSURED_TIME = "backPressuredTimeMs" + SUFFIX_RATE; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java index 1a449f90c3749a80d9a3db77303b4ef359e0f75e..9be21809adb0944e798d908d2f44709a8261e5fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java @@ -46,6 +46,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup { private final Meter numRecordsOutRate; private final Meter numBuffersOutRate; private final Meter idleTimePerSecond; + private final Meter backPressuredTimePerSecond; public TaskIOMetricGroup(TaskMetricGroup parent) { super(parent); @@ -68,6 +69,8 @@ public class TaskIOMetricGroup extends ProxyMetricGroup { this.idleTimePerSecond = meter(MetricNames.TASK_IDLE_TIME, new MeterView(new SimpleCounter())); + this.backPressuredTimePerSecond = + meter(MetricNames.TASK_BACK_PRESSURED_TIME, new MeterView(new SimpleCounter())); } public IOMetrics createSnapshot() { @@ -102,6 +105,10 @@ public class TaskIOMetricGroup extends ProxyMetricGroup { return idleTimePerSecond; } + public Meter getBackPressuredTimePerSecond() { + return backPressuredTimePerSecond; + } + // ============================================================================================ // Metric Reuse // ============================================================================================ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java index 8d01a66b51da478574bb4139868d7a9715dc115a..059c815d41cf169c302a92d7c1b993b16eabb4f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java @@ -493,7 +493,7 @@ public class ResultPartitionTest { } @Test - public void testIdleTime() throws IOException, InterruptedException { + public void testIdleAndBackPressuredTime() throws IOException, InterruptedException { // setup int bufferSize = 1024; NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize); @@ -509,8 +509,8 @@ public class ResultPartitionTest { Buffer buffer = readView.getNextBuffer().buffer(); assertNotNull(buffer); - // idle time is zero when there is buffer available. - assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount()); + // back-pressured time is zero when there is buffer available. + assertEquals(0, resultPartition.getBackPressuredTimeMsPerSecond().getCount()); CountDownLatch syncLock = new CountDownLatch(1); final Thread requestThread = @@ -536,7 +536,8 @@ public class ResultPartitionTest { requestThread.join(); Assert.assertThat( - resultPartition.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L)); + resultPartition.getBackPressuredTimeMsPerSecond().getCount(), + Matchers.greaterThan(0L)); assertNotNull(readView.getNextBuffer().buffer()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 03fe18cfd84207da954c35431280183d488adb2e..8d3ce706246b3cad90b15d01996c5d7c7601e33f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.io.InputStatus; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; @@ -47,6 +48,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.plugable.SerializationDelegate; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; @@ -72,6 +74,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; +import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction.Suspension; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory; import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox; @@ -304,7 +307,6 @@ public abstract class StreamTask> extends Ab this.recordWriter = createRecordWriterDelegate(configuration, environment); this.actionExecutor = Preconditions.checkNotNull(actionExecutor); this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor); - this.mailboxProcessor.initMetric(environment.getMetricGroup()); this.mainMailboxExecutor = mailboxProcessor.getMainMailboxExecutor(); this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment); this.asyncOperationsThreadPool = @@ -400,25 +402,20 @@ public abstract class StreamTask> extends Ab controller.allActionsCompleted(); return; } - CompletableFuture jointFuture = getInputOutputJointFuture(status); - MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction(); - assertNoException(jointFuture.thenRun(suspendedDefaultAction::resume)); - } - /** - * Considers three scenarios to combine input and output futures: 1. Both input and output are - * unavailable. 2. Only input is unavailable. 3. Only output is unavailable. - */ - @VisibleForTesting - CompletableFuture getInputOutputJointFuture(InputStatus status) { - if (status == InputStatus.NOTHING_AVAILABLE && !recordWriter.isAvailable()) { - return CompletableFuture.allOf( - inputProcessor.getAvailableFuture(), recordWriter.getAvailableFuture()); - } else if (status == InputStatus.NOTHING_AVAILABLE) { - return inputProcessor.getAvailableFuture(); + TaskIOMetricGroup ioMetrics = getEnvironment().getMetricGroup().getIOMetricGroup(); + final long startTime = System.currentTimeMillis(); + Meter timer; + CompletableFuture resumeFuture; + if (!recordWriter.isAvailable()) { + timer = ioMetrics.getBackPressuredTimePerSecond(); + resumeFuture = recordWriter.getAvailableFuture(); } else { - return recordWriter.getAvailableFuture(); + timer = ioMetrics.getIdleTimeMsPerSecond(); + resumeFuture = inputProcessor.getAvailableFuture(); } + assertNoException( + resumeFuture.thenRun(new ResumeWrapper(controller.suspendDefaultAction(), timer, startTime))); } private void resetSynchronousSavepointId() { @@ -1321,4 +1318,22 @@ public abstract class StreamTask> extends Ab protected long getAsyncCheckpointStartDelayNanos() { return latestAsyncCheckpointStartDelayNanos; } + + private static class ResumeWrapper implements Runnable { + private final Suspension suspendedDefaultAction; + private final Meter timer; + private final long startTime; + + public ResumeWrapper(Suspension suspendedDefaultAction, Meter timer, long startTime) { + this.suspendedDefaultAction = suspendedDefaultAction; + this.timer = timer; + this.startTime = startTime; + } + + @Override + public void run() { + timer.markEvent(System.currentTimeMillis() - startTime); + suspendedDefaultAction.resume(); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java index 9626c0adade9b368d3afed9f7db8a28a78c84613..699b9e9378bb010743064a26f2b2a16d6e625543 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java @@ -19,10 +19,6 @@ package org.apache.flink.streaming.runtime.tasks.mailbox; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.metrics.Meter; -import org.apache.flink.metrics.MeterView; -import org.apache.flink.metrics.SimpleCounter; -import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox.MailboxClosedException; @@ -91,8 +87,6 @@ public class MailboxProcessor implements Closeable { private final StreamTaskActionExecutor actionExecutor; - private Meter idleTime = new MeterView(new SimpleCounter()); - @VisibleForTesting public MailboxProcessor() { this(MailboxDefaultAction.Controller::suspendDefaultAction); @@ -131,10 +125,6 @@ public class MailboxProcessor implements Closeable { return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this); } - public void initMetric(TaskMetricGroup metricGroup) { - idleTime = metricGroup.getIOMetricGroup().getIdleTimeMsPerSecond(); - } - /** Lifecycle method to close the mailbox for action submission. */ public void prepareClose() { mailbox.quiesce(); @@ -315,9 +305,7 @@ public class MailboxProcessor implements Closeable { while (isDefaultActionUnavailable() && isMailboxLoopRunning()) { maybeMail = mailbox.tryTake(MIN_PRIORITY); if (!maybeMail.isPresent()) { - long start = System.currentTimeMillis(); maybeMail = Optional.of(mailbox.take(MIN_PRIORITY)); - idleTime.markEvent(System.currentTimeMillis() - start); } maybeMail.get().run(); processed = true; @@ -354,11 +342,6 @@ public class MailboxProcessor implements Closeable { return mailboxLoopRunning; } - @VisibleForTesting - public Meter getIdleTime() { - return idleTime; - } - @VisibleForTesting public boolean hasMail() { return mailbox.hasMail(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 6706dd83a13243956cdd8051e9bd12c1b8f67055..ffcf60334b942c10730f21fe6a4c9b6aced55d43 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriter; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.operators.testutils.MockEnvironment; @@ -116,6 +117,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.SupplierWithException; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -1241,6 +1243,7 @@ public class StreamTaskTest extends TestLogger { @Test public void testProcessWithUnAvailableOutput() throws Exception { + final long sleepTime = 42; try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, false})) { final int numberOfProcessCalls = 10; final AvailabilityTestInputProcessor inputProcessor = @@ -1253,6 +1256,7 @@ public class StreamTaskTest extends TestLogger { final RunnableWithException completeFutureTask = () -> { + Thread.sleep(sleepTime + 1); assertEquals(1, inputProcessor.currentNumProcessCalls); assertTrue(task.mailboxProcessor.isDefaultActionUnavailable()); environment.getWriter(1).getAvailableFuture().complete(null); @@ -1266,11 +1270,39 @@ public class StreamTaskTest extends TestLogger { }, "This task will submit another task to execute after processing input once."); + TaskIOMetricGroup ioMetricGroup = + task.getEnvironment().getMetricGroup().getIOMetricGroup(); task.invoke(); + assertThat( + ioMetricGroup.getBackPressuredTimePerSecond().getCount(), + Matchers.greaterThanOrEqualTo(sleepTime)); + assertThat(ioMetricGroup.getIdleTimeMsPerSecond().getCount(), is(0L)); assertEquals(numberOfProcessCalls, inputProcessor.currentNumProcessCalls); } } + @Test + public void testProcessWithUnAvailableInput() throws Exception { + final long unAvailableTime = 42; + try (final MockEnvironment environment = setupEnvironment(new boolean[] {true, true})) { + final UnAvailableTestInputProcessor inputProcessor = + new UnAvailableTestInputProcessor(unAvailableTime); + final StreamTask task = + new MockStreamTaskBuilder(environment) + .setStreamInputProcessor(inputProcessor) + .build(); + + TaskIOMetricGroup ioMetricGroup = + task.getEnvironment().getMetricGroup().getIOMetricGroup(); + task.invoke(); + + assertThat( + ioMetricGroup.getIdleTimeMsPerSecond().getCount(), + Matchers.greaterThanOrEqualTo(unAvailableTime)); + assertThat(ioMetricGroup.getBackPressuredTimePerSecond().getCount(), is(0L)); + } + } + private MockEnvironment setupEnvironment(boolean[] outputAvailabilities) { final Configuration configuration = new Configuration(); new MockStreamConfig(configuration, outputAvailabilities.length); @@ -1433,6 +1465,76 @@ public class StreamTaskTest extends TestLogger { } } + /** + * A stream input processor implementation with input unavailable for a specified amount of + * time, after which processor is closing. + */ + private static class UnAvailableTestInputProcessor implements StreamInputProcessor { + private final AvailabilityHelper availabilityProvider = new AvailabilityHelper(); + private final Thread timerThread; + + private boolean timerTriggered; + + private volatile Exception asyncException; + + public UnAvailableTestInputProcessor(long unAvailableTime) { + timerThread = + new Thread() { + @Override + public void run() { + try { + Thread.sleep(unAvailableTime); + availabilityProvider + .getUnavailableToResetAvailable() + .complete(null); + } catch (Exception e) { + asyncException = e; + } + } + }; + } + + @Override + public InputStatus processInput() { + maybeTriggerTimer(); + return availabilityProvider.isAvailable() + ? InputStatus.END_OF_INPUT + : InputStatus.NOTHING_AVAILABLE; + } + + @Override + public CompletableFuture prepareSnapshot( + ChannelStateWriter channelStateWriter, final long checkpointId) { + return FutureUtils.completedVoidFuture(); + } + + @Override + public void close() throws IOException { + if (asyncException != null) { + throw new IOException(asyncException); + } + try { + timerThread.join(); + } catch (InterruptedException e) { + throw new IOException(e); + } + } + + @Override + public CompletableFuture getAvailableFuture() { + maybeTriggerTimer(); + return availabilityProvider.getAvailableFuture(); + } + + private void maybeTriggerTimer() { + if (timerTriggered) { + return; + } + timerTriggered = true; + timerThread.start(); + } + } + private static class BlockingCloseStreamOperator extends AbstractStreamOperator { private static final long serialVersionUID = -9042150529568008847L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java index e787338e8dc68d2898a11d0d9d1123c22d1a8246..94b5f5d990daaed27c7b9db34c4c51da3f159234 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/mailbox/TaskMailboxProcessorTest.java @@ -23,13 +23,11 @@ import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.util.FlinkException; import org.apache.flink.util.function.RunnableWithException; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -254,71 +252,6 @@ public class TaskMailboxProcessorTest { mailboxProcessor.allActionsCompleted(); } - @Test - public void testNoIdleTimeWhenBusy() throws InterruptedException { - final AtomicReference suspendedActionRef = - new AtomicReference<>(); - final int totalSwitches = 10; - - AtomicInteger count = new AtomicInteger(); - MailboxThread mailboxThread = - new MailboxThread() { - @Override - public void runDefaultAction(Controller controller) { - int currentCount = count.incrementAndGet(); - if (currentCount == totalSwitches) { - controller.allActionsCompleted(); - } - } - }; - mailboxThread.start(); - final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor(); - - mailboxThread.signalStart(); - mailboxThread.join(); - - Assert.assertEquals(0, mailboxProcessor.getIdleTime().getCount()); - Assert.assertEquals(totalSwitches, count.get()); - } - - @Test - public void testIdleTime() throws InterruptedException { - final AtomicReference suspendedActionRef = - new AtomicReference<>(); - final int totalSwitches = 2; - - CountDownLatch syncLock = new CountDownLatch(1); - MailboxThread mailboxThread = - new MailboxThread() { - int count = 0; - - @Override - public void runDefaultAction(Controller controller) { - // If this is violated, it means that the default action was invoked while - // we assumed suspension - Assert.assertTrue( - suspendedActionRef.compareAndSet( - null, controller.suspendDefaultAction())); - ++count; - if (count == totalSwitches) { - controller.allActionsCompleted(); - } - syncLock.countDown(); - } - }; - mailboxThread.start(); - final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor(); - mailboxThread.signalStart(); - - syncLock.await(); - Thread.sleep(10); - mailboxProcessor - .getMailboxExecutor(DEFAULT_PRIORITY) - .execute(suspendedActionRef.get()::resume, "resume"); - mailboxThread.join(); - Assert.assertThat(mailboxProcessor.getIdleTime().getCount(), Matchers.greaterThan(0L)); - } - private static MailboxProcessor start(MailboxThread mailboxThread) { mailboxThread.start(); final MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();