diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java index 6e02542617b5907f945f2d98552336c42669045e..fcbf9d553f4e599aa13a3e44a268e4829a25ae9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolDestroyTest.java @@ -104,11 +104,10 @@ public class LocalBufferPoolDestroyTest { * @return Flag indicating whether the Thread is in a blocking buffer * request or not */ - private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) { + public static boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) { if (stackTrace.length >= 3) { return stackTrace[0].getMethodName().equals("wait") && - stackTrace[1].getMethodName().equals("requestBuffer") && - stackTrace[2].getMethodName().equals("requestBufferBlocking"); + stackTrace[1].getClassName().equals(LocalBufferPool.class.getName()); } else { return false; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java index b2b118ce42f4744a15e5cc377b43422ab71ff004..1d09ce6ba91620e7f5190d9748d401fab05b2172 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java @@ -38,15 +38,18 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhe import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning; import org.apache.flink.types.LongValue; import org.apache.flink.util.TestLogger; + import org.junit.Test; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - +import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -171,21 +174,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger { } } - /** - * Returns whether the stack trace represents a Thread in a blocking buffer - * request. - * - * @param stackTrace Stack trace of the Thread to check - * - * @return Flag indicating whether the Thread is in a blocking buffer - * request or not - */ - private boolean isInBlockingBufferRequest(StackTraceElement[] stackTrace) { - return stackTrace.length >= 3 && stackTrace[0].getMethodName().equals("wait") && - stackTrace[1].getMethodName().equals("requestBuffer") && - stackTrace[2].getMethodName().equals("requestBufferBlocking"); - } - /** * Invokable emitting records in a separate Thread (not the main Task * thread).