提交 e838e5a6 编写于 作者: D Denghui Dong 提交者: D-D-H

[Backport] 8234888: EventStream::close doesn't abort streaming thread

Summary:

Test Plan: jdk/jfr

Reviewed-by: yuleil

Issue: https://github.com/alibaba/dragonwell8/issues/112
上级 235b51f4
...@@ -48,7 +48,7 @@ import jdk.jfr.internal.SecuritySupport; ...@@ -48,7 +48,7 @@ import jdk.jfr.internal.SecuritySupport;
* an event stream. * an event stream.
*/ */
abstract class AbstractEventStream implements EventStream { abstract class AbstractEventStream implements EventStream {
private final static AtomicLong counter = new AtomicLong(1); private final static AtomicLong counter = new AtomicLong(0);
private final Object terminated = new Object(); private final Object terminated = new Object();
private final Runnable flushOperation = () -> dispatcher().runFlushActions(); private final Runnable flushOperation = () -> dispatcher().runFlushActions();
......
...@@ -106,6 +106,7 @@ public final class ChunkParser { ...@@ -106,6 +106,7 @@ public final class ChunkParser {
private Runnable flushOperation; private Runnable flushOperation;
private ParserConfiguration configuration; private ParserConfiguration configuration;
private volatile boolean closed;
public ChunkParser(RecordingInput input) throws IOException { public ChunkParser(RecordingInput input) throws IOException {
this(input, new ParserConfiguration()); this(input, new ParserConfiguration());
...@@ -284,6 +285,9 @@ public final class ChunkParser { ...@@ -284,6 +285,9 @@ public final class ChunkParser {
Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes"); Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Waiting for more data (streaming). Read so far: " + chunkHeader.getChunkSize() + " bytes");
} }
while (true) { while (true) {
if (closed) {
return true;
}
if (chunkHeader.getLastNanos() > filterEnd) { if (chunkHeader.getLastNanos() > filterEnd) {
return true; return true;
} }
...@@ -455,4 +459,9 @@ public final class ChunkParser { ...@@ -455,4 +459,9 @@ public final class ChunkParser {
return chunkHeader.isFinalChunk(); return chunkHeader.isFinalChunk();
} }
public void close() {
this.closed = true;
Utils.notifyFlush();
}
} }
...@@ -69,6 +69,9 @@ public class EventDirectoryStream extends AbstractEventStream { ...@@ -69,6 +69,9 @@ public class EventDirectoryStream extends AbstractEventStream {
setClosed(true); setClosed(true);
dispatcher().runCloseActions(); dispatcher().runCloseActions();
repositoryFiles.close(); repositoryFiles.close();
if (currentParser != null) {
currentParser.close();
}
} }
@Override @Override
......
...@@ -25,11 +25,15 @@ ...@@ -25,11 +25,15 @@
package jdk.jfr.api.consumer.recordingstream; package jdk.jfr.api.consumer.recordingstream;
import java.time.Instant;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import jdk.jfr.Event; import jdk.jfr.Event;
import jdk.jfr.Recording;
import jdk.jfr.consumer.EventStream;
import jdk.jfr.consumer.RecordingStream; import jdk.jfr.consumer.RecordingStream;
/** /**
...@@ -50,6 +54,7 @@ public class TestClose { ...@@ -50,6 +54,7 @@ public class TestClose {
testCloseTwice(); testCloseTwice();
testCloseStreaming(); testCloseStreaming();
testCloseMySelf(); testCloseMySelf();
testCloseNoEvents();
} }
private static void testCloseMySelf() throws Exception { private static void testCloseMySelf() throws Exception {
...@@ -121,6 +126,26 @@ public class TestClose { ...@@ -121,6 +126,26 @@ public class TestClose {
log("Leaving testCloseTwice()"); log("Leaving testCloseTwice()");
} }
private static void testCloseNoEvents() throws Exception {
try (Recording r = new Recording()) {
r.start();
CountDownLatch finished = new CountDownLatch(2);
AtomicReference<Thread> streamingThread = new AtomicReference<>();
try (EventStream es = EventStream.openRepository()) {
es.setStartTime(Instant.EPOCH);
es.onFlush( () -> {
streamingThread.set(Thread.currentThread());
finished.countDown();;
});
es.startAsync();
finished.await();
} // <- EventStream::close should terminate thread
while (streamingThread.get().isAlive()) {
Thread.sleep(10);
}
}
}
private static void log(String msg) { private static void log(String msg) {
System.out.println(msg); System.out.println(msg);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册