diff --git a/src/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java b/src/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java index cf22a8758ee421227c1396d215f4a2afb333b5bf..8a6049b9cb049508709f8a852e0d7411f6f11956 100644 --- a/src/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java +++ b/src/share/classes/jdk/jfr/internal/consumer/AbstractEventStream.java @@ -76,9 +76,10 @@ abstract class AbstractEventStream implements EventStream { abstract public void close(); protected final Dispatcher dispatcher() { - if (configuration.hasChanged()) { + if (configuration.hasChanged()) { // quick check synchronized (configuration) { dispatcher = new Dispatcher(configuration); + configuration.setChanged(false); } } return dispatcher; diff --git a/src/share/classes/jdk/jfr/internal/consumer/ChunkParser.java b/src/share/classes/jdk/jfr/internal/consumer/ChunkParser.java index 005c4b7504445712428427f6fe46f503887a2c7a..4d5b90ec48c1bc42a38caf62ebc84958da264428 100644 --- a/src/share/classes/jdk/jfr/internal/consumer/ChunkParser.java +++ b/src/share/classes/jdk/jfr/internal/consumer/ChunkParser.java @@ -190,44 +190,40 @@ public final class ChunkParser { * * @param awaitNewEvents wait for new data. */ - RecordedEvent readStreamingEvent(boolean awaitNewEvents) throws IOException { + RecordedEvent readStreamingEvent() throws IOException { long absoluteChunkEnd = chunkHeader.getEnd(); - while (true) { - RecordedEvent event = readEvent(); - if (event != null) { - return event; - } - if (!awaitNewEvents) { - return null; - } - long lastValid = absoluteChunkEnd; - long metadataPoistion = chunkHeader.getMetataPosition(); - long contantPosition = chunkHeader.getConstantPoolPosition(); - chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd); - if (chunkFinished) { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end"); - return null; - } - absoluteChunkEnd = chunkHeader.getEnd(); - // Read metadata and constant pools for the next segment - if (chunkHeader.getMetataPosition() != metadataPoistion) { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers"); - MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata); - ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); - parsers = factory.getParsers(); - typeMap = factory.getTypeMap(); - updateConfiguration();; - } - if (contantPosition != chunkHeader.getConstantPoolPosition()) { - Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); - constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false)); - fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart()); - constantLookups.forEach(c -> c.getLatestPool().setResolving()); - constantLookups.forEach(c -> c.getLatestPool().resolve()); - constantLookups.forEach(c -> c.getLatestPool().setResolved()); - } - input.position(lastValid); + RecordedEvent event = readEvent(); + if (event != null) { + return event; + } + long lastValid = absoluteChunkEnd; + long metadataPosition = chunkHeader.getMetataPosition(); + long contantPosition = chunkHeader.getConstantPoolPosition(); + chunkFinished = awaitUpdatedHeader(absoluteChunkEnd, configuration.filterEnd); + if (chunkFinished) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "At chunk end"); + return null; } + absoluteChunkEnd = chunkHeader.getEnd(); + // Read metadata and constant pools for the next segment + if (chunkHeader.getMetataPosition() != metadataPosition) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new metadata in chunk. Rebuilding types and parsers"); + MetadataDescriptor metadata = chunkHeader.readMetadata(previousMetadata); + ParserFactory factory = new ParserFactory(metadata, constantLookups, timeConverter); + parsers = factory.getParsers(); + typeMap = factory.getTypeMap(); + updateConfiguration(); + } + if (contantPosition != chunkHeader.getConstantPoolPosition()) { + Logger.log(LogTag.JFR_SYSTEM_PARSER, LogLevel.INFO, "Found new constant pool data. Filling up pools with new values"); + constantLookups.forEach(c -> c.getLatestPool().setAllResolved(false)); + fillConstantPools(contantPosition + chunkHeader.getAbsoluteChunkStart()); + constantLookups.forEach(c -> c.getLatestPool().setResolving()); + constantLookups.forEach(c -> c.getLatestPool().resolve()); + constantLookups.forEach(c -> c.getLatestPool().setResolved()); + } + input.position(lastValid); + return null; } /** diff --git a/src/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java b/src/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java index 570bb89cfc9fd414c981bc582660bd01528cd81a..66a01e7d0c6ac6af5ab38d3e48983ed3cbe1c3ff 100644 --- a/src/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java +++ b/src/share/classes/jdk/jfr/internal/consumer/EventDirectoryStream.java @@ -105,8 +105,8 @@ public class EventDirectoryStream extends AbstractEventStream { } protected void processRecursionSafe() throws IOException { + Dispatcher lastDisp = null; Dispatcher disp = dispatcher(); - Path path; boolean validStartTime = recording != null || disp.startTime != null; if (validStartTime) { @@ -125,18 +125,20 @@ public class EventDirectoryStream extends AbstractEventStream { long filterEnd = disp.endTime != null ? disp.endNanos: Long.MAX_VALUE; while (!isClosed()) { - boolean awaitnewEvent = false; while (!isClosed() && !currentParser.isChunkFinished()) { disp = dispatcher(); - ParserConfiguration pc = disp.parserConfiguration; - pc.filterStart = filterStart; - pc.filterEnd = filterEnd; - currentParser.updateConfiguration(pc, true); - currentParser.setFlushOperation(getFlushOperation()); - if (pc.isOrdered()) { - awaitnewEvent = processOrdered(disp, awaitnewEvent); + if (disp != lastDisp) { + ParserConfiguration pc = disp.parserConfiguration; + pc.filterStart = filterStart; + pc.filterEnd = filterEnd; + currentParser.updateConfiguration(pc, true); + currentParser.setFlushOperation(getFlushOperation()); + lastDisp = disp; + } + if (disp.parserConfiguration.isOrdered()) { + processOrdered(disp); } else { - awaitnewEvent = processUnordered(disp, awaitnewEvent); + processUnordered(disp); } if (currentParser.getStartNanos() + currentParser.getChunkDuration() > filterEnd) { close(); @@ -182,29 +184,24 @@ public class EventDirectoryStream extends AbstractEventStream { return recording.getFinalChunkStartNanos() >= currentParser.getStartNanos(); } - private boolean processOrdered(Dispatcher c, boolean awaitNewEvents) throws IOException { + private void processOrdered(Dispatcher c) throws IOException { if (sortedCache == null) { sortedCache = new RecordedEvent[100_000]; } int index = 0; while (true) { - RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents); + RecordedEvent e = currentParser.readStreamingEvent(); if (e == null) { - // wait for new event with next call to - // readStreamingEvent() - awaitNewEvents = true; break; } - awaitNewEvents = false; if (index == sortedCache.length) { sortedCache = Arrays.copyOf(sortedCache, sortedCache.length * 2); } sortedCache[index++] = e; } - // no events found if (index == 0 && currentParser.isChunkFinished()) { - return awaitNewEvents; + return; } // at least 2 events, sort them if (index > 1) { @@ -213,12 +210,12 @@ public class EventDirectoryStream extends AbstractEventStream { for (int i = 0; i < index; i++) { c.dispatch(sortedCache[i]); } - return awaitNewEvents; + return; } - private boolean processUnordered(Dispatcher c, boolean awaitNewEvents) throws IOException { + private boolean processUnordered(Dispatcher c) throws IOException { while (true) { - RecordedEvent e = currentParser.readStreamingEvent(awaitNewEvents); + RecordedEvent e = currentParser.readStreamingEvent(); if (e == null) { return true; } else { diff --git a/src/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java b/src/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java index 3fd937cb26ce5072ad415b4a833ef4b797c81949..9fd47bfdf66d6a9a5e15584e5f79a9de9dae7d76 100644 --- a/src/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java +++ b/src/share/classes/jdk/jfr/internal/consumer/StreamConfiguration.java @@ -121,4 +121,8 @@ final class StreamConfiguration { public boolean hasChanged() { return changed; } + + public void setChanged(boolean changed) { + this.changed = changed; + } } diff --git a/test/jdk/jfr/api/consumer/recordingstream/TestClose.java b/test/jdk/jfr/api/consumer/recordingstream/TestClose.java index cb1d6c8648c486ad9d6fb1f9aab382c9caa2990e..fc25d3145ec5aecdf7f482f15d14ef5def1bb13b 100644 --- a/test/jdk/jfr/api/consumer/recordingstream/TestClose.java +++ b/test/jdk/jfr/api/consumer/recordingstream/TestClose.java @@ -26,9 +26,7 @@ package jdk.jfr.api.consumer.recordingstream; import java.time.Instant; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import jdk.jfr.Event; @@ -57,96 +55,82 @@ public class TestClose { testCloseNoEvents(); } - private static void testCloseMySelf() throws Exception { - log("Entering testCloseMySelf()"); - CountDownLatch l1 = new CountDownLatch(1); - CountDownLatch l2 = new CountDownLatch(1); - RecordingStream r = new RecordingStream(); - r.onEvent(e -> { - try { - l1.await(); - r.close(); - l2.countDown(); - } catch (InterruptedException ie) { - throw new Error(ie); - } - }); - r.startAsync(); - CloseEvent c = new CloseEvent(); - c.commit(); - l1.countDown(); - l2.await(); - log("Leaving testCloseMySelf()"); - } + private static void testCloseUnstarted() { + System.out.println("testCloseUnstarted()"); - private static void testCloseStreaming() throws Exception { - log("Entering testCloseStreaming()"); - CountDownLatch streaming = new CountDownLatch(1); - RecordingStream r = new RecordingStream(); - AtomicLong count = new AtomicLong(); - r.onEvent(e -> { - if (count.incrementAndGet() > 100) { - streaming.countDown(); - } - }); - r.startAsync(); - CompletableFuture streamingLoop = CompletableFuture.runAsync(() -> { - while (true) { - CloseEvent c = new CloseEvent(); - c.commit(); - } - }); - streaming.await(); - r.close(); - streamingLoop.cancel(true); - log("Leaving testCloseStreaming()"); + try (RecordingStream r = new RecordingStream()) { + r.close(); + } } private static void testCloseStarted() { - log("Entering testCloseStarted()"); - RecordingStream r = new RecordingStream(); - r.startAsync(); - r.close(); - log("Leaving testCloseStarted()"); - } + System.out.println("testCloseStarted()"); - private static void testCloseUnstarted() { - log("Entering testCloseUnstarted()"); - RecordingStream r = new RecordingStream(); - r.close(); - log("Leaving testCloseUnstarted()"); + try (RecordingStream r = new RecordingStream()) { + r.startAsync(); + } // <- Close } private static void testCloseTwice() { - log("Entering testCloseTwice()"); - RecordingStream r = new RecordingStream(); - r.startAsync(); - r.close(); - r.close(); - log("Leaving testCloseTwice()"); + System.out.println("Entering testCloseTwice()"); + + try (RecordingStream r = new RecordingStream()) { + r.startAsync(); + r.close(); + } // <- Second close + } + + private static void testCloseStreaming() throws Exception { + System.out.println("Entering testCloseStreaming()"); + + EventProducer p = new EventProducer(); + p.start(); + CountDownLatch streaming = new CountDownLatch(1); + try (RecordingStream r = new RecordingStream()) { + r.onEvent(e -> { + streaming.countDown(); + }); + r.startAsync(); + streaming.await(); + } // <- Close + p.kill(); + } + + private static void testCloseMySelf() throws Exception { + System.out.println("testCloseMySelf()"); + + CountDownLatch closed = new CountDownLatch(1); + try (RecordingStream r = new RecordingStream()) { + r.onEvent(e -> { + r.close(); // <- Close + closed.countDown(); + }); + r.startAsync(); + CloseEvent c = new CloseEvent(); + c.commit(); + closed.await(); + } } private static void testCloseNoEvents() throws Exception { + System.out.println("testCloseNoEvents()"); + try (Recording r = new Recording()) { r.start(); CountDownLatch finished = new CountDownLatch(2); AtomicReference streamingThread = new AtomicReference<>(); try (EventStream es = EventStream.openRepository()) { es.setStartTime(Instant.EPOCH); - es.onFlush( () -> { + es.onFlush(() -> { streamingThread.set(Thread.currentThread()); - finished.countDown();; + finished.countDown(); }); es.startAsync(); finished.await(); - } // <- EventStream::close should terminate thread + } // <- Close should terminate thread while (streamingThread.get().isAlive()) { Thread.sleep(10); } } } - - private static void log(String msg) { - System.out.println(msg); - } } diff --git a/test/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java b/test/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java index 8d4114ffe7651ca61a7e5124e18464b584028358..6ff60a43bf783fe3dd18f4ef9a60c83738690d97 100644 --- a/test/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java +++ b/test/jdk/jfr/api/consumer/recordingstream/TestOnEvent.java @@ -57,6 +57,7 @@ public class TestOnEvent { testOnEvent(); testNamedEvent(); testTwoEventWithSameName(); + testOnEventAfterStart(); } private static void testOnEventNull() { @@ -148,6 +149,29 @@ public class TestOnEvent { log("Leaving testOnEvent()"); } + private static void testOnEventAfterStart() { + try (RecordingStream r = new RecordingStream()) { + EventProducer p = new EventProducer(); + p.start(); + Thread addHandler = new Thread(() -> { + r.onEvent(e -> { + // Got event, close stream + r.close(); + }); + }); + r.onFlush(() -> { + // Only add handler once + if (!"started".equals(addHandler.getName())) { + addHandler.setName("started"); + addHandler.start(); + } + }); + r.start(); + p.kill(); + } + } + + private static void log(String msg) { System.out.println(msg); }