提交 3e73496c 编写于 作者: S Stephan Ewen

[FLINK-2464] [tests] Make buffer spilling test robust to Java 6.

上级 d738430c
......@@ -30,14 +30,15 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.*;
......@@ -91,6 +92,8 @@ public class BufferSpillerTest {
assertFalse(spiller.getCurrentChannel().isOpen());
assertFalse(spiller.getCurrentSpillFile().exists());
}
checkNoTempFilesRemain();
}
// ------------------------------------------------------------------------
......@@ -189,14 +192,17 @@ public class BufferSpillerTest {
final int sequences = 10;
final Random rnd = new Random();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
final SequenceConsumer consumer = new SequenceConsumer(error, sequences);
consumer.start();
final int maxNumEventsAndBuffers = 30000;
final int maxNumChannels = 1656;
int sequencesConsumed = 0;
ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
SequenceToConsume currentSequence = null;
int currentNumEvents = 0;
int currentNumRecordAndEvents = 0;
// do multiple spilling / rolling over rounds
for (int round = 0; round < 2*sequences; round++) {
......@@ -214,43 +220,110 @@ public class BufferSpillerTest {
final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
// generate sequence
for (int i = 0; i < numEventsAndBuffers; i++) {
boolean isEvent = rnd.nextDouble() < 0.05d;
if (isEvent) {
BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
events.add(evt);
spiller.add(evt);
int generated = 0;
while (generated < numEventsAndBuffers) {
if (currentSequence == null || rnd.nextDouble() < 0.5) {
// add a new record
boolean isEvent = rnd.nextDouble() < 0.05;
if (isEvent) {
BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
events.add(evt);
spiller.add(evt);
}
else {
BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
spiller.add(evt);
}
generated++;
}
else {
BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
spiller.add(evt);
// consume a record
BufferOrEvent next = currentSequence.sequence.getNext();
assertNotNull(next);
if (next.isEvent()) {
BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
assertEquals(expected.getEvent(), next.getEvent());
assertEquals(expected.getChannelIndex(), next.getChannelIndex());
}
else {
Random validationRnd = currentSequence.bufferRnd;
validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
}
currentNumRecordAndEvents++;
if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
// done with the sequence
currentSequence.sequence.cleanup();
sequencesConsumed++;
// validate we had all events
assertEquals(currentSequence.events.size(), currentNumEvents);
// reset
currentSequence = pendingSequences.pollFirst();
if (currentSequence != null) {
currentSequence.sequence.open();
}
currentNumRecordAndEvents = 0;
currentNumEvents = 0;
}
}
}
// reset and create reader
// done generating a sequence. queue it for consumption
bufferRnd.setSeed(bufferSeed);
BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
consumer.queue(stc);
if (currentSequence == null) {
currentSequence = stc;
stc.sequence.open();
}
else {
pendingSequences.addLast(stc);
}
}
}
// wait for the consumer
consumer.join(180000);
assertFalse("sequence consumer did not finish its work in time", consumer.isAlive());
// validate there was no error in the consumer
if (error.get() != null) {
Throwable t = error.get();
if (t instanceof Error) {
throw (Error) t;
// consume all the remainder
while (currentSequence != null) {
// consume a record
BufferOrEvent next = currentSequence.sequence.getNext();
assertNotNull(next);
if (next.isEvent()) {
BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
assertEquals(expected.getEvent(), next.getEvent());
assertEquals(expected.getChannelIndex(), next.getChannelIndex());
}
else {
throw new Exception("Error while consuming the spilled records", t);
Random validationRnd = currentSequence.bufferRnd;
validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
}
currentNumRecordAndEvents++;
if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
// done with the sequence
currentSequence.sequence.cleanup();
sequencesConsumed++;
// validate we had all events
assertEquals(currentSequence.events.size(), currentNumEvents);
// reset
currentSequence = pendingSequences.pollFirst();
if (currentSequence != null) {
currentSequence.sequence.open();
}
currentNumRecordAndEvents = 0;
currentNumEvents = 0;
}
}
assertEquals(sequences, sequencesConsumed);
}
catch (Exception e) {
e.printStackTrace();
......@@ -300,10 +373,17 @@ public class BufferSpillerTest {
}
}
}
// ------------------------------------------------------------------------
// Async Consumer
// ------------------------------------------------------------------------
private static void checkNoTempFilesRemain() {
// validate that all temp files have been removed
for (File dir : IO_MANAGER.getSpillingDirectories()) {
for (String file : dir.list()) {
if (file != null && !(file.equals(".") || file.equals(".."))) {
fail("barrier buffer did not clean up temp files. remaining file: " + file);
}
}
}
}
private static class SequenceToConsume {
......@@ -323,81 +403,4 @@ public class BufferSpillerTest {
this.numChannels = numChannels;
}
}
private static class SequenceConsumer extends Thread {
private final AtomicReference<Throwable> error;
private final BlockingQueue<SequenceToConsume> sequences;
private final int numSequencesToConsume;
private int consumedSequences;
private SequenceConsumer(AtomicReference<Throwable> error, int numSequencesToConsume) {
super("Sequence Consumer");
setDaemon(true);
this.error = error;
this.numSequencesToConsume = numSequencesToConsume;
this.sequences = new LinkedBlockingQueue<SequenceToConsume>();
}
@Override
public void run() {
try {
while (consumedSequences < numSequencesToConsume) {
// get next sequence
SequenceToConsume nextSequence = sequences.take();
// wait a bit, allow some stuff to queue up
Thread.sleep(50);
BufferSpiller.SpilledBufferOrEventSequence seq = nextSequence.sequence;
ArrayList<BufferOrEvent> events = nextSequence.events;
Random bufferRnd = nextSequence.bufferRnd;
int numBuffersAndEvents = nextSequence.numBuffersAndEvents;
int numChannels = nextSequence.numChannels;
LOG.info("Reading sequence {}", consumedSequences);
// consume sequence
seq.open();
int numEvent = 0;
for (int i = 0; i < numBuffersAndEvents; i++) {
BufferOrEvent next = seq.getNext();
assertNotNull(next);
if (next.isEvent()) {
BufferOrEvent expected = events.get(numEvent++);
assertEquals(expected.getEvent(), next.getEvent());
assertEquals(expected.getChannelIndex(), next.getChannelIndex());
}
else {
validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
}
}
// no further data
assertNull(seq.getNext());
// all events need to be consumed
assertEquals(events.size(), numEvent);
// remove all temp files
seq.cleanup();
consumedSequences++;
}
}
catch (Throwable t) {
error.set(t);
}
}
public void queue(SequenceToConsume next) {
sequences.add(next);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册