From 027179c59c6f72b9eb4857e915ac1d4419531111 Mon Sep 17 00:00:00 2001 From: sewen Date: Sat, 13 Apr 2013 19:06:08 +0200 Subject: [PATCH] Cleaned up pom.xml Small fix for pact runtime tests. --- .../io/MemoryAccessSpeedBenchmark.java | 176 ------------- .../concurrent/BlockingBackChannelTest.java | 239 +++++++++--------- .../iterative/concurrent/BrokerTest.java | 172 +++++++------ .../iterative/concurrent/StringPair.java | 25 +- .../concurrent/SuperstepBarrierTest.java | 160 ++++++------ ...Test.java => HashVsSortMiniBenchmark.java} | 13 +- .../runtime/task/util/OutputEmitterTest.java | 47 ++-- pom.xml | 71 +++--- 8 files changed, 367 insertions(+), 536 deletions(-) delete mode 100644 pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/io/MemoryAccessSpeedBenchmark.java rename pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/{HashVsSortTest.java => HashVsSortMiniBenchmark.java} (98%) diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/io/MemoryAccessSpeedBenchmark.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/io/MemoryAccessSpeedBenchmark.java deleted file mode 100644 index 33059ffc896..00000000000 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/io/MemoryAccessSpeedBenchmark.java +++ /dev/null @@ -1,176 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.pact.runtime.io; - -import java.lang.ref.WeakReference; -import java.util.Random; - -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class MemoryAccessSpeedBenchmark { - private static final int ARRAY_LENGTH = 1024 * 1024 * 164; - - private static final int SEGMENT_OFFSET = 1024 * 1024 * 16; - - private static final int SEGMENT_LENGTH = 1024 * 1024 * 128; // 128M segment - - private static final int NUMBER_OF_ITERATIONS = 16; // x8 iterations (i.e. test for 1G read + 1G write throughput ) - - private static final long RANDOM_SEED = 235646234421L; - - private static byte[] sourceBytes; - - private static byte[] targetBytes; - - private static MemorySegmentHardReference segmentHardReference; - - private static MemorySegmentWeakReference segmentWeakReference; - - private static Random random = new Random(); - - @BeforeClass - public static void initialize() { - sourceBytes = new byte[SEGMENT_LENGTH]; - targetBytes = new byte[ARRAY_LENGTH]; - MemorySegmentDescriptor descriptor = new MemorySegmentDescriptor(targetBytes, SEGMENT_OFFSET, SEGMENT_LENGTH); - WeakReference descriptorReference = new WeakReference( - descriptor); - segmentHardReference = new MemorySegmentHardReference(descriptor); - segmentWeakReference = new MemorySegmentWeakReference(descriptorReference); - } - - @AfterClass - public static void destruct() { - sourceBytes = null; - targetBytes = null; - segmentHardReference = null; - segmentWeakReference = null; - } - - @Before - public void setUp() { - random.setSeed(RANDOM_SEED); - random.nextBytes(sourceBytes); - } - - @Test - public void testIndirectAccessWithWeakReference() { - for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { - for (int j = 0; j < SEGMENT_LENGTH; j++) { - segmentWeakReference.write(j, sourceBytes[j]); - } - - for (int j = 0; j < SEGMENT_LENGTH; j++) { - assert (sourceBytes[j] == segmentWeakReference.read(j)); - } - } - } - - @Test - public void testIndirectAccessWithHardReference() { - for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { - for (int j = 0; j < SEGMENT_LENGTH; j++) { - segmentHardReference.write(j, sourceBytes[j]); - } - - for (int j = 0; j < SEGMENT_LENGTH; j++) { - assert (sourceBytes[j] == segmentHardReference.read(j)); - } - } - } - - @Test - public void testDirectAccess() { - for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { - for (int j = 0; j < SEGMENT_LENGTH; j++) { - targetBytes[SEGMENT_OFFSET + j] = sourceBytes[j]; - } - - for (int j = 0; j < SEGMENT_LENGTH; j++) { - assert (sourceBytes[j] == targetBytes[SEGMENT_OFFSET + j]); - } - } - } - - private static final class MemorySegmentHardReference { - private final MemorySegmentDescriptor descriptor; - - public MemorySegmentHardReference(MemorySegmentDescriptor descriptor) { - this.descriptor = descriptor; - } - - public byte read(int position) { - if (position < descriptor.start || descriptor.end >= position) { - return descriptor.memory[(position + descriptor.start)]; - } else { - throw new IndexOutOfBoundsException(); - } - } - - public void write(int position, byte data) { - if (position < descriptor.start || descriptor.end >= position) { - descriptor.memory[(position + descriptor.start)] = data; - } else { - throw new IndexOutOfBoundsException(); - } - } - } - - private static final class MemorySegmentWeakReference { - private final WeakReference descriptorReference; - - public MemorySegmentWeakReference(WeakReference descriptorReference) { - this.descriptorReference = descriptorReference; - } - - public byte read(int position) { - MemorySegmentDescriptor descriptor = descriptorReference.get(); - - if (position < descriptor.start || descriptor.end >= position) { - return descriptor.memory[(position + descriptor.start)]; - } else { - throw new IndexOutOfBoundsException(); - } - } - - public void write(int position, byte data) { - MemorySegmentDescriptor descriptor = descriptorReference.get(); - - if (position < descriptor.start || descriptor.end >= position) { - descriptor.memory[(position + descriptor.start)] = data; - } else { - throw new IndexOutOfBoundsException(); - } - } - } - - private static final class MemorySegmentDescriptor { - public final byte[] memory; - - public final int start; - - public final int end; - - public MemorySegmentDescriptor(byte[] bytes, int start, int end) { - this.memory = bytes; - this.start = start; - this.end = end; - } - } -} diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BlockingBackChannelTest.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BlockingBackChannelTest.java index 884c02ea397..587c580a8be 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BlockingBackChannelTest.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BlockingBackChannelTest.java @@ -31,121 +31,128 @@ import static org.junit.Assert.assertEquals; public class BlockingBackChannelTest { - private static final int NUM_ITERATIONS = 3; - private static final Integer INPUT_COMPLETELY_PROCESSED_MESSAGE = 1; - - @Test - public void multiThreaded() throws InterruptedException { - - BlockingQueue dataChannel = new ArrayBlockingQueue(1); - List actionLog = Lists.newArrayList(); - - SerializedUpdateBuffer buffer = Mockito.mock(SerializedUpdateBuffer.class); - BlockingBackChannel channel = new BlockingBackChannel(buffer); - - Thread head = new Thread(new IterationHead(channel, dataChannel, actionLog)); - Thread tail = new Thread(new IterationTail(channel, dataChannel, actionLog)); - - tail.start(); - head.start(); - - head.join(); - tail.join(); - - int action = 0; - for (String log : actionLog) { - System.out.println("ACTION " + (++action) + ": " + log); - } - - assertEquals(12, actionLog.size()); - - assertEquals("head sends data", actionLog.get(0)); - assertEquals("tail receives data", actionLog.get(1)); - assertEquals("tail writes in iteration 0", actionLog.get(2)); - assertEquals("head reads in iteration 0", actionLog.get(3)); - assertEquals("head sends data", actionLog.get(4)); - assertEquals("tail receives data", actionLog.get(5)); - assertEquals("tail writes in iteration 1", actionLog.get(6)); - assertEquals("head reads in iteration 1", actionLog.get(7)); - assertEquals("head sends data", actionLog.get(8)); - assertEquals("tail receives data", actionLog.get(9)); - assertEquals("tail writes in iteration 2", actionLog.get(10)); - assertEquals("head reads in iteration 2", actionLog.get(11)); - } - - class IterationHead implements Runnable { - - private final BlockingBackChannel backChannel; - private final BlockingQueue dataChannel; - private final Random random; - private final List actionLog; - - IterationHead(BlockingBackChannel backChannel, BlockingQueue dataChannel, List actionLog) { - this.backChannel = backChannel; - this.dataChannel = dataChannel; - this.actionLog = actionLog; - random = new Random(); - } - - @Override - public void run() { - processInputAndSendMessageThroughDataChannel(); - for (int n = 0; n < NUM_ITERATIONS; n++) { - try { - backChannel.getReadEndAfterSuperstepEnded(); - actionLog.add("head reads in iteration " + n); - Thread.sleep(random.nextInt(100)); - // we don't send through the data channel in the last iteration, we would send to the output task - if (n != NUM_ITERATIONS - 1) { - processInputAndSendMessageThroughDataChannel(); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } - - void processInputAndSendMessageThroughDataChannel() { - actionLog.add("head sends data"); - dataChannel.offer(INPUT_COMPLETELY_PROCESSED_MESSAGE); - } - } - - class IterationTail implements Runnable { - - private final BlockingBackChannel backChannel; - private final BlockingQueue dataChannel; - private final Random random; - private final List actionLog; - - IterationTail(BlockingBackChannel backChannel, BlockingQueue dataChannel, List actionLog) { - this.backChannel = backChannel; - this.dataChannel = dataChannel; - this.actionLog = actionLog; - random = new Random(); - } - - @Override - public void run() { - try { - for (int n = 0; n < NUM_ITERATIONS; n++) { - DataOutputView writeEnd = backChannel.getWriteEnd(); - readInputFromDataChannel(); - Thread.sleep(random.nextInt(10)); - DataInputView inputView = Mockito.mock(DataInputView.class); - actionLog.add("tail writes in iteration " + n); - writeEnd.write(inputView, 1); - backChannel.notifyOfEndOfSuperstep(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - void readInputFromDataChannel() throws InterruptedException { - dataChannel.take(); - actionLog.add("tail receives data"); - } - } + private static final int NUM_ITERATIONS = 3; + + private static final Integer INPUT_COMPLETELY_PROCESSED_MESSAGE = 1; + + @Test + public void multiThreaded() throws InterruptedException { + + BlockingQueue dataChannel = new ArrayBlockingQueue(1); + List actionLog = Lists.newArrayList(); + + SerializedUpdateBuffer buffer = Mockito.mock(SerializedUpdateBuffer.class); + BlockingBackChannel channel = new BlockingBackChannel(buffer); + + Thread head = new Thread(new IterationHead(channel, dataChannel, actionLog)); + Thread tail = new Thread(new IterationTail(channel, dataChannel, actionLog)); + + tail.start(); + head.start(); + + head.join(); + tail.join(); + +// int action = 0; +// for (String log : actionLog) { +// System.out.println("ACTION " + (++action) + ": " + log); +// } + + assertEquals(12, actionLog.size()); + + assertEquals("head sends data", actionLog.get(0)); + assertEquals("tail receives data", actionLog.get(1)); + assertEquals("tail writes in iteration 0", actionLog.get(2)); + assertEquals("head reads in iteration 0", actionLog.get(3)); + assertEquals("head sends data", actionLog.get(4)); + assertEquals("tail receives data", actionLog.get(5)); + assertEquals("tail writes in iteration 1", actionLog.get(6)); + assertEquals("head reads in iteration 1", actionLog.get(7)); + assertEquals("head sends data", actionLog.get(8)); + assertEquals("tail receives data", actionLog.get(9)); + assertEquals("tail writes in iteration 2", actionLog.get(10)); + assertEquals("head reads in iteration 2", actionLog.get(11)); + } + + class IterationHead implements Runnable { + + private final BlockingBackChannel backChannel; + + private final BlockingQueue dataChannel; + + private final Random random; + + private final List actionLog; + + IterationHead(BlockingBackChannel backChannel, BlockingQueue dataChannel, List actionLog) { + this.backChannel = backChannel; + this.dataChannel = dataChannel; + this.actionLog = actionLog; + random = new Random(); + } + + @Override + public void run() { + processInputAndSendMessageThroughDataChannel(); + for (int n = 0; n < NUM_ITERATIONS; n++) { + try { + backChannel.getReadEndAfterSuperstepEnded(); + actionLog.add("head reads in iteration " + n); + Thread.sleep(random.nextInt(100)); + // we don't send through the data channel in the last iteration, we would send to the output task + if (n != NUM_ITERATIONS - 1) { + processInputAndSendMessageThroughDataChannel(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + void processInputAndSendMessageThroughDataChannel() { + actionLog.add("head sends data"); + dataChannel.offer(INPUT_COMPLETELY_PROCESSED_MESSAGE); + } + } + + class IterationTail implements Runnable { + + private final BlockingBackChannel backChannel; + + private final BlockingQueue dataChannel; + + private final Random random; + + private final List actionLog; + + IterationTail(BlockingBackChannel backChannel, BlockingQueue dataChannel, List actionLog) { + this.backChannel = backChannel; + this.dataChannel = dataChannel; + this.actionLog = actionLog; + random = new Random(); + } + + @Override + public void run() { + try { + for (int n = 0; n < NUM_ITERATIONS; n++) { + DataOutputView writeEnd = backChannel.getWriteEnd(); + readInputFromDataChannel(); + Thread.sleep(random.nextInt(10)); + DataInputView inputView = Mockito.mock(DataInputView.class); + actionLog.add("tail writes in iteration " + n); + writeEnd.write(inputView, 1); + backChannel.notifyOfEndOfSuperstep(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + void readInputFromDataChannel() throws InterruptedException { + dataChannel.take(); + actionLog.add("tail receives data"); + } + } } diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BrokerTest.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BrokerTest.java index 5894f2f2967..af0bfb2b2e5 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BrokerTest.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/BrokerTest.java @@ -32,90 +32,94 @@ import static org.junit.Assert.assertEquals; public class BrokerTest { - @Test - public void mediation() throws Exception { - Random random = new Random(); - for (int n = 0; n < 20; n++) { - mediate(random.nextInt(10) + 1); - } - } - - void mediate(int subtasks) throws InterruptedException, ExecutionException { - - ExecutorService executorService = Executors.newFixedThreadPool(subtasks * 2); - - List> tasks = Lists.newArrayList(); - Broker broker = new Broker(); - - for (int subtask = 0; subtask < subtasks; subtask++) { - tasks.add(new IterationHead(broker, subtask, "value" + subtask)); - tasks.add(new IterationTail(broker, subtask)); - } - - Collections.shuffle(tasks); - - int numSuccessfulHandovers = 0; - for (Future future : executorService.invokeAll(tasks)) { - StringPair stringPair = future.get(); - if (stringPair != null) { - assertEquals("value" + stringPair.getFirst(), stringPair.getSecond()); - numSuccessfulHandovers++; - } - } - - assertEquals(subtasks, numSuccessfulHandovers); - } - - class IterationHead implements Callable { - - private final Random random; - private final Broker broker; - private final String key; - private final String value; - - IterationHead(Broker broker, Integer key, String value) { - this.broker = broker; - this.key = String.valueOf(key); - this.value = value; - random = new Random(); - } - - @Override - public StringPair call() throws Exception { - Thread.sleep(random.nextInt(10)); - System.out.println("Head " + key + " hands in " + value); - broker.handIn(key, value); - Thread.sleep(random.nextInt(10)); - return null; - } - - } - - class IterationTail implements Callable { - - private final Random random; - private final Broker broker; - private final String key; - - IterationTail(Broker broker, Integer key) { - this.broker = broker; - this.key = String.valueOf(key); - random = new Random(); - } - - @Override - public StringPair call() throws Exception { - - Thread.sleep(random.nextInt(10)); - System.out.println("Tail " + key + " asks for handover"); - String value = broker.get(key); - - System.out.println("Tail " + key + " received " + value); - Preconditions.checkNotNull(value); - - return new StringPair(key, value); - } - } + @Test + public void mediation() throws Exception { + Random random = new Random(); + for (int n = 0; n < 20; n++) { + mediate(random.nextInt(10) + 1); + } + } + void mediate(int subtasks) throws InterruptedException, ExecutionException { + + ExecutorService executorService = Executors.newFixedThreadPool(subtasks * 2); + + List> tasks = Lists.newArrayList(); + Broker broker = new Broker(); + + for (int subtask = 0; subtask < subtasks; subtask++) { + tasks.add(new IterationHead(broker, subtask, "value" + subtask)); + tasks.add(new IterationTail(broker, subtask)); + } + + Collections.shuffle(tasks); + + int numSuccessfulHandovers = 0; + for (Future future : executorService.invokeAll(tasks)) { + StringPair stringPair = future.get(); + if (stringPair != null) { + assertEquals("value" + stringPair.getFirst(), stringPair.getSecond()); + numSuccessfulHandovers++; + } + } + + assertEquals(subtasks, numSuccessfulHandovers); + } + + class IterationHead implements Callable { + + private final Random random; + + private final Broker broker; + + private final String key; + + private final String value; + + IterationHead(Broker broker, Integer key, String value) { + this.broker = broker; + this.key = String.valueOf(key); + this.value = value; + random = new Random(); + } + + @Override + public StringPair call() throws Exception { + Thread.sleep(random.nextInt(10)); + // System.out.println("Head " + key + " hands in " + value); + broker.handIn(key, value); + Thread.sleep(random.nextInt(10)); + return null; + } + + } + + class IterationTail implements Callable { + + private final Random random; + + private final Broker broker; + + private final String key; + + IterationTail(Broker broker, Integer key) { + this.broker = broker; + this.key = String.valueOf(key); + random = new Random(); + } + + @Override + public StringPair call() throws Exception { + + Thread.sleep(random.nextInt(10)); +// System.out.println("Tail " + key + " asks for handover"); + String value = broker.get(key); + +// System.out.println("Tail " + key + " received " + value); + Preconditions.checkNotNull(value); + + return new StringPair(key, value); + } + } } diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/StringPair.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/StringPair.java index 7695cb11196..75bde29a13e 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/StringPair.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/StringPair.java @@ -17,19 +17,20 @@ package eu.stratosphere.pact.runtime.iterative.concurrent; class StringPair { - private final String first; - private final String second; + private final String first; - StringPair(String first, String second) { - this.first = first; - this.second = second; - } + private final String second; - public String getFirst() { - return first; - } + StringPair(String first, String second) { + this.first = first; + this.second = second; + } - public String getSecond() { - return second; - } + public String getFirst() { + return first; + } + + public String getSecond() { + return second; + } } \ No newline at end of file diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/SuperstepBarrierTest.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/SuperstepBarrierTest.java index b157e0c3cf4..0cd7bb8e15e 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/SuperstepBarrierTest.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/iterative/concurrent/SuperstepBarrierTest.java @@ -27,107 +27,111 @@ import static org.junit.Assert.assertTrue; public class SuperstepBarrierTest { - @Test - public void syncEndOfSuperstep() throws InterruptedException { - for (int n = 0; n < 20; n++) { - sync(new EndOfSuperstepEvent()); - } - } + @Test + public void syncEndOfSuperstep() throws InterruptedException { + for (int n = 0; n < 20; n++) { + sync(new EndOfSuperstepEvent()); + } + } - @Test - public void syncTermination() throws InterruptedException { - for (int n = 0; n < 20; n++) { - sync(new TerminationEvent()); - } - } + @Test + public void syncTermination() throws InterruptedException { + for (int n = 0; n < 20; n++) { + sync(new TerminationEvent()); + } + } - private void sync(AbstractTaskEvent event) throws InterruptedException { + private void sync(AbstractTaskEvent event) throws InterruptedException { - TerminationSignaled terminationSignaled = new TerminationSignaled(); + TerminationSignaled terminationSignaled = new TerminationSignaled(); - SuperstepBarrier barrier = new SuperstepBarrier(); - barrier.setup(); + SuperstepBarrier barrier = new SuperstepBarrier(); + barrier.setup(); - Thread headThread = new Thread(new IterationHead(barrier, terminationSignaled)); - Thread syncThread = new Thread(new IterationSync(barrier, event)); + Thread headThread = new Thread(new IterationHead(barrier, terminationSignaled)); + Thread syncThread = new Thread(new IterationSync(barrier, event)); - headThread.start(); - syncThread.start(); + headThread.start(); + syncThread.start(); - headThread.join(); - syncThread.join(); + headThread.join(); + syncThread.join(); - if (event instanceof TerminationEvent) { - assertTrue(terminationSignaled.isTerminationSignaled()); - } else { - assertFalse(terminationSignaled.isTerminationSignaled()); - } - } + if (event instanceof TerminationEvent) { + assertTrue(terminationSignaled.isTerminationSignaled()); + } else { + assertFalse(terminationSignaled.isTerminationSignaled()); + } + } - class IterationHead implements Runnable { + class IterationHead implements Runnable { - private final SuperstepBarrier barrier; - private final TerminationSignaled terminationSignaled; - private final Random random; + private final SuperstepBarrier barrier; - IterationHead(SuperstepBarrier barrier, TerminationSignaled terminationSignaled) { - this.barrier = barrier; - this.terminationSignaled = terminationSignaled; - random = new Random(); - } + private final TerminationSignaled terminationSignaled; - @Override - public void run() { - try { - Thread.sleep(random.nextInt(10)); + private final Random random; - barrier.waitForOtherWorkers(); + IterationHead(SuperstepBarrier barrier, TerminationSignaled terminationSignaled) { + this.barrier = barrier; + this.terminationSignaled = terminationSignaled; + random = new Random(); + } - if (barrier.terminationSignaled()) { - terminationSignaled.setTerminationSignaled(); - } + @Override + public void run() { + try { + Thread.sleep(random.nextInt(10)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } + barrier.waitForOtherWorkers(); - class IterationSync implements Runnable { + if (barrier.terminationSignaled()) { + terminationSignaled.setTerminationSignaled(); + } - private final SuperstepBarrier barrier; - private final AbstractTaskEvent event; - private final Random random; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } - IterationSync(SuperstepBarrier barrier, AbstractTaskEvent event) { - this.barrier = barrier; - this.event = event; - random = new Random(); - } + class IterationSync implements Runnable { - @Override - public void run() { - try { - Thread.sleep(random.nextInt(10)); + private final SuperstepBarrier barrier; - barrier.eventOccurred(event); + private final AbstractTaskEvent event; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - } + private final Random random; - class TerminationSignaled { + IterationSync(SuperstepBarrier barrier, AbstractTaskEvent event) { + this.barrier = barrier; + this.event = event; + random = new Random(); + } - private volatile boolean terminationSignaled; + @Override + public void run() { + try { + Thread.sleep(random.nextInt(10)); - public boolean isTerminationSignaled() { - return terminationSignaled; - } + barrier.eventOccurred(event); - public void setTerminationSignaled() { - terminationSignaled = true; - } - } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + class TerminationSignaled { + + private volatile boolean terminationSignaled; + + public boolean isTerminationSignaled() { + return terminationSignaled; + } + + public void setTerminationSignaled() { + terminationSignaled = true; + } + } } diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortTest.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java similarity index 98% rename from pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortTest.java rename to pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java index a57ea4f8f3c..64fa33b57cd 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortTest.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/HashVsSortMiniBenchmark.java @@ -46,7 +46,8 @@ import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode; import eu.stratosphere.pact.runtime.test.util.TestData.Generator.ValueMode; -public class HashVsSortTest { +public class HashVsSortMiniBenchmark { + // total memory private static final int MEMORY_SIZE = 1024 * 1024 * 32; @@ -83,8 +84,7 @@ public class HashVsSortTest { @SuppressWarnings("unchecked") @Before - public void beforeTest() - { + public void beforeTest() { this.serializer1 = PactRecordSerializer.get(); this.serializer2 = PactRecordSerializer.get(); this.comparator1 = new PactRecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); @@ -96,8 +96,7 @@ public class HashVsSortTest { } @After - public void afterTest() - { + public void afterTest() { if (this.memoryManager != null) { Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", this.memoryManager.verifyEmpty()); @@ -244,10 +243,8 @@ public class HashVsSortTest { } - private static final class NoOpMatcher extends MatchStub - { + private static final class NoOpMatcher extends MatchStub { @Override public void match(PactRecord rec1, PactRecord rec2, Collector out) {} - } } diff --git a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java index 650ac2b599c..117068fbaaf 100644 --- a/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java +++ b/pact/pact-runtime/src/test/java/eu/stratosphere/pact/runtime/task/util/OutputEmitterTest.java @@ -42,13 +42,12 @@ import eu.stratosphere.pact.runtime.plugable.pactrecord.PactRecordComparator; import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; -public class OutputEmitterTest extends TestCase -{ +public class OutputEmitterTest extends TestCase { + private static final long SEED = 485213591485399L; @Test - public void testPartitionHash() - { + public void testPartitionHash() { // Test for PactInteger @SuppressWarnings("unchecked") final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); @@ -105,8 +104,7 @@ public class OutputEmitterTest extends TestCase } @Test - public void testForward() - { + public void testForward() { // Test for PactInteger @SuppressWarnings("unchecked") final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); @@ -164,8 +162,7 @@ public class OutputEmitterTest extends TestCase } @Test - public void testBroadcast() - { + public void testBroadcast() { // Test for PactInteger @SuppressWarnings("unchecked") final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); @@ -216,8 +213,7 @@ public class OutputEmitterTest extends TestCase } @Test - public void testMultiKeys() - { + public void testMultiKeys() { @SuppressWarnings("unchecked") final PactRecordComparator multiComp = new PactRecordComparator(new int[] {0,1,3}, new Class[] {PactInteger.class, PactString.class, PactDouble.class}); final ChannelSelector oe1 = new PactRecordOutputEmitter(ShipStrategyType.PARTITION_HASH, multiComp); @@ -249,8 +245,7 @@ public class OutputEmitterTest extends TestCase } @Test - public void testMissingKey() - { + public void testMissingKey() { // Test for PactInteger @SuppressWarnings("unchecked") final PactRecordComparator intComp = new PactRecordComparator(new int[] {1}, new Class[] {PactInteger.class}); @@ -269,8 +264,7 @@ public class OutputEmitterTest extends TestCase } @Test - public void testNullKey() - { + public void testNullKey() { // Test for PactInteger @SuppressWarnings("unchecked") final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); @@ -324,8 +318,7 @@ public class OutputEmitterTest extends TestCase } @Test - public void testPartitionRange() - { + public void testPartitionRange() { final Random rnd = new Random(SEED); final int DISTR_MIN = 0; @@ -370,28 +363,30 @@ public class OutputEmitterTest extends TestCase } } - private static final class IntegerUniformDistribution implements DataDistribution - { + private static final class IntegerUniformDistribution implements DataDistribution { + private int min; private int max; - public IntegerUniformDistribution(int min, int max) - { + public IntegerUniformDistribution(int min, int max) { this.min = min; this.max = max; } @Override - public void write(DataOutput out) - {} + public void write(DataOutput out) throws IOException { + out.writeInt(this.min); + out.writeInt(this.max); + } @Override - public void read(DataInput in) - {} + public void read(DataInput in) throws IOException { + this.min = in.readInt(); + this.max = in.readInt(); + } @Override - public PactRecord getBucketBoundary(int splitNum, int totalSplits) - { + public PactRecord getBucketBoundary(int splitNum, int totalSplits) { final int range = this.max - this.min + 1; final float bucketWidth = ((float) range) / totalSplits; final int upperBoundary = this.min + (int) ((splitNum + 1) * bucketWidth); diff --git a/pom.xml b/pom.xml index 09658c9761c..ba3cf5977b8 100644 --- a/pom.xml +++ b/pom.xml @@ -160,28 +160,29 @@ - - + + - - + + - - + + + 2.3 TODO @@ -190,7 +191,7 @@ @deprecated - + --> - - + + - - + + + 2.0.1 Max Medium true true - + --> - - + + + 2.5 1.6 @@ -236,18 +236,17 @@ - + --> - + - org.apache.maven.plugins maven-javadoc-plugin 2.5 + - org.apache.maven.plugins maven-checkstyle-plugin 2.6 @@ -256,15 +255,15 @@ + - org.apache.maven.plugins maven-surefire-report-plugin 2.7 - - + + - + @@ -409,7 +408,7 @@ - + + nephele -- GitLab