提交 027179c5 编写于 作者: S sewen

Cleaned up pom.xml

Small fix for pact runtime tests.
上级 2fd68f11
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.io;
import java.lang.ref.WeakReference;
import java.util.Random;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class MemoryAccessSpeedBenchmark {
private static final int ARRAY_LENGTH = 1024 * 1024 * 164;
private static final int SEGMENT_OFFSET = 1024 * 1024 * 16;
private static final int SEGMENT_LENGTH = 1024 * 1024 * 128; // 128M segment
private static final int NUMBER_OF_ITERATIONS = 16; // x8 iterations (i.e. test for 1G read + 1G write throughput )
private static final long RANDOM_SEED = 235646234421L;
private static byte[] sourceBytes;
private static byte[] targetBytes;
private static MemorySegmentHardReference segmentHardReference;
private static MemorySegmentWeakReference segmentWeakReference;
private static Random random = new Random();
@BeforeClass
public static void initialize() {
sourceBytes = new byte[SEGMENT_LENGTH];
targetBytes = new byte[ARRAY_LENGTH];
MemorySegmentDescriptor descriptor = new MemorySegmentDescriptor(targetBytes, SEGMENT_OFFSET, SEGMENT_LENGTH);
WeakReference<MemorySegmentDescriptor> descriptorReference = new WeakReference<MemorySegmentDescriptor>(
descriptor);
segmentHardReference = new MemorySegmentHardReference(descriptor);
segmentWeakReference = new MemorySegmentWeakReference(descriptorReference);
}
@AfterClass
public static void destruct() {
sourceBytes = null;
targetBytes = null;
segmentHardReference = null;
segmentWeakReference = null;
}
@Before
public void setUp() {
random.setSeed(RANDOM_SEED);
random.nextBytes(sourceBytes);
}
@Test
public void testIndirectAccessWithWeakReference() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
for (int j = 0; j < SEGMENT_LENGTH; j++) {
segmentWeakReference.write(j, sourceBytes[j]);
}
for (int j = 0; j < SEGMENT_LENGTH; j++) {
assert (sourceBytes[j] == segmentWeakReference.read(j));
}
}
}
@Test
public void testIndirectAccessWithHardReference() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
for (int j = 0; j < SEGMENT_LENGTH; j++) {
segmentHardReference.write(j, sourceBytes[j]);
}
for (int j = 0; j < SEGMENT_LENGTH; j++) {
assert (sourceBytes[j] == segmentHardReference.read(j));
}
}
}
@Test
public void testDirectAccess() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
for (int j = 0; j < SEGMENT_LENGTH; j++) {
targetBytes[SEGMENT_OFFSET + j] = sourceBytes[j];
}
for (int j = 0; j < SEGMENT_LENGTH; j++) {
assert (sourceBytes[j] == targetBytes[SEGMENT_OFFSET + j]);
}
}
}
private static final class MemorySegmentHardReference {
private final MemorySegmentDescriptor descriptor;
public MemorySegmentHardReference(MemorySegmentDescriptor descriptor) {
this.descriptor = descriptor;
}
public byte read(int position) {
if (position < descriptor.start || descriptor.end >= position) {
return descriptor.memory[(position + descriptor.start)];
} else {
throw new IndexOutOfBoundsException();
}
}
public void write(int position, byte data) {
if (position < descriptor.start || descriptor.end >= position) {
descriptor.memory[(position + descriptor.start)] = data;
} else {
throw new IndexOutOfBoundsException();
}
}
}
private static final class MemorySegmentWeakReference {
private final WeakReference<MemorySegmentDescriptor> descriptorReference;
public MemorySegmentWeakReference(WeakReference<MemorySegmentDescriptor> descriptorReference) {
this.descriptorReference = descriptorReference;
}
public byte read(int position) {
MemorySegmentDescriptor descriptor = descriptorReference.get();
if (position < descriptor.start || descriptor.end >= position) {
return descriptor.memory[(position + descriptor.start)];
} else {
throw new IndexOutOfBoundsException();
}
}
public void write(int position, byte data) {
MemorySegmentDescriptor descriptor = descriptorReference.get();
if (position < descriptor.start || descriptor.end >= position) {
descriptor.memory[(position + descriptor.start)] = data;
} else {
throw new IndexOutOfBoundsException();
}
}
}
private static final class MemorySegmentDescriptor {
public final byte[] memory;
public final int start;
public final int end;
public MemorySegmentDescriptor(byte[] bytes, int start, int end) {
this.memory = bytes;
this.start = start;
this.end = end;
}
}
}
...@@ -31,121 +31,128 @@ import static org.junit.Assert.assertEquals; ...@@ -31,121 +31,128 @@ import static org.junit.Assert.assertEquals;
public class BlockingBackChannelTest { public class BlockingBackChannelTest {
private static final int NUM_ITERATIONS = 3; private static final int NUM_ITERATIONS = 3;
private static final Integer INPUT_COMPLETELY_PROCESSED_MESSAGE = 1;
private static final Integer INPUT_COMPLETELY_PROCESSED_MESSAGE = 1;
@Test
public void multiThreaded() throws InterruptedException { @Test
public void multiThreaded() throws InterruptedException {
BlockingQueue<Integer> dataChannel = new ArrayBlockingQueue<Integer>(1);
List<String> actionLog = Lists.newArrayList(); BlockingQueue<Integer> dataChannel = new ArrayBlockingQueue<Integer>(1);
List<String> actionLog = Lists.newArrayList();
SerializedUpdateBuffer buffer = Mockito.mock(SerializedUpdateBuffer.class);
BlockingBackChannel channel = new BlockingBackChannel(buffer); SerializedUpdateBuffer buffer = Mockito.mock(SerializedUpdateBuffer.class);
BlockingBackChannel channel = new BlockingBackChannel(buffer);
Thread head = new Thread(new IterationHead(channel, dataChannel, actionLog));
Thread tail = new Thread(new IterationTail(channel, dataChannel, actionLog)); Thread head = new Thread(new IterationHead(channel, dataChannel, actionLog));
Thread tail = new Thread(new IterationTail(channel, dataChannel, actionLog));
tail.start();
head.start(); tail.start();
head.start();
head.join();
tail.join(); head.join();
tail.join();
int action = 0;
for (String log : actionLog) { // int action = 0;
System.out.println("ACTION " + (++action) + ": " + log); // for (String log : actionLog) {
} // System.out.println("ACTION " + (++action) + ": " + log);
// }
assertEquals(12, actionLog.size());
assertEquals(12, actionLog.size());
assertEquals("head sends data", actionLog.get(0));
assertEquals("tail receives data", actionLog.get(1)); assertEquals("head sends data", actionLog.get(0));
assertEquals("tail writes in iteration 0", actionLog.get(2)); assertEquals("tail receives data", actionLog.get(1));
assertEquals("head reads in iteration 0", actionLog.get(3)); assertEquals("tail writes in iteration 0", actionLog.get(2));
assertEquals("head sends data", actionLog.get(4)); assertEquals("head reads in iteration 0", actionLog.get(3));
assertEquals("tail receives data", actionLog.get(5)); assertEquals("head sends data", actionLog.get(4));
assertEquals("tail writes in iteration 1", actionLog.get(6)); assertEquals("tail receives data", actionLog.get(5));
assertEquals("head reads in iteration 1", actionLog.get(7)); assertEquals("tail writes in iteration 1", actionLog.get(6));
assertEquals("head sends data", actionLog.get(8)); assertEquals("head reads in iteration 1", actionLog.get(7));
assertEquals("tail receives data", actionLog.get(9)); assertEquals("head sends data", actionLog.get(8));
assertEquals("tail writes in iteration 2", actionLog.get(10)); assertEquals("tail receives data", actionLog.get(9));
assertEquals("head reads in iteration 2", actionLog.get(11)); assertEquals("tail writes in iteration 2", actionLog.get(10));
} assertEquals("head reads in iteration 2", actionLog.get(11));
}
class IterationHead implements Runnable {
class IterationHead implements Runnable {
private final BlockingBackChannel backChannel;
private final BlockingQueue<Integer> dataChannel; private final BlockingBackChannel backChannel;
private final Random random;
private final List<String> actionLog; private final BlockingQueue<Integer> dataChannel;
IterationHead(BlockingBackChannel backChannel, BlockingQueue<Integer> dataChannel, List<String> actionLog) { private final Random random;
this.backChannel = backChannel;
this.dataChannel = dataChannel; private final List<String> actionLog;
this.actionLog = actionLog;
random = new Random(); IterationHead(BlockingBackChannel backChannel, BlockingQueue<Integer> dataChannel, List<String> actionLog) {
} this.backChannel = backChannel;
this.dataChannel = dataChannel;
@Override this.actionLog = actionLog;
public void run() { random = new Random();
processInputAndSendMessageThroughDataChannel(); }
for (int n = 0; n < NUM_ITERATIONS; n++) {
try { @Override
backChannel.getReadEndAfterSuperstepEnded(); public void run() {
actionLog.add("head reads in iteration " + n); processInputAndSendMessageThroughDataChannel();
Thread.sleep(random.nextInt(100)); for (int n = 0; n < NUM_ITERATIONS; n++) {
// we don't send through the data channel in the last iteration, we would send to the output task try {
if (n != NUM_ITERATIONS - 1) { backChannel.getReadEndAfterSuperstepEnded();
processInputAndSendMessageThroughDataChannel(); actionLog.add("head reads in iteration " + n);
} Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) { // we don't send through the data channel in the last iteration, we would send to the output task
throw new RuntimeException(e); if (n != NUM_ITERATIONS - 1) {
} processInputAndSendMessageThroughDataChannel();
} }
} } catch (InterruptedException e) {
throw new RuntimeException(e);
void processInputAndSendMessageThroughDataChannel() { }
actionLog.add("head sends data"); }
dataChannel.offer(INPUT_COMPLETELY_PROCESSED_MESSAGE); }
}
} void processInputAndSendMessageThroughDataChannel() {
actionLog.add("head sends data");
class IterationTail implements Runnable { dataChannel.offer(INPUT_COMPLETELY_PROCESSED_MESSAGE);
}
private final BlockingBackChannel backChannel; }
private final BlockingQueue<Integer> dataChannel;
private final Random random; class IterationTail implements Runnable {
private final List<String> actionLog;
private final BlockingBackChannel backChannel;
IterationTail(BlockingBackChannel backChannel, BlockingQueue<Integer> dataChannel, List<String> actionLog) {
this.backChannel = backChannel; private final BlockingQueue<Integer> dataChannel;
this.dataChannel = dataChannel;
this.actionLog = actionLog; private final Random random;
random = new Random();
} private final List<String> actionLog;
@Override IterationTail(BlockingBackChannel backChannel, BlockingQueue<Integer> dataChannel, List<String> actionLog) {
public void run() { this.backChannel = backChannel;
try { this.dataChannel = dataChannel;
for (int n = 0; n < NUM_ITERATIONS; n++) { this.actionLog = actionLog;
DataOutputView writeEnd = backChannel.getWriteEnd(); random = new Random();
readInputFromDataChannel(); }
Thread.sleep(random.nextInt(10));
DataInputView inputView = Mockito.mock(DataInputView.class); @Override
actionLog.add("tail writes in iteration " + n); public void run() {
writeEnd.write(inputView, 1); try {
backChannel.notifyOfEndOfSuperstep(); for (int n = 0; n < NUM_ITERATIONS; n++) {
} DataOutputView writeEnd = backChannel.getWriteEnd();
} catch (Exception e) { readInputFromDataChannel();
throw new RuntimeException(e); Thread.sleep(random.nextInt(10));
} DataInputView inputView = Mockito.mock(DataInputView.class);
} actionLog.add("tail writes in iteration " + n);
writeEnd.write(inputView, 1);
void readInputFromDataChannel() throws InterruptedException { backChannel.notifyOfEndOfSuperstep();
dataChannel.take(); }
actionLog.add("tail receives data"); } catch (Exception e) {
} throw new RuntimeException(e);
} }
}
void readInputFromDataChannel() throws InterruptedException {
dataChannel.take();
actionLog.add("tail receives data");
}
}
} }
...@@ -32,90 +32,94 @@ import static org.junit.Assert.assertEquals; ...@@ -32,90 +32,94 @@ import static org.junit.Assert.assertEquals;
public class BrokerTest { public class BrokerTest {
@Test @Test
public void mediation() throws Exception { public void mediation() throws Exception {
Random random = new Random(); Random random = new Random();
for (int n = 0; n < 20; n++) { for (int n = 0; n < 20; n++) {
mediate(random.nextInt(10) + 1); mediate(random.nextInt(10) + 1);
} }
} }
void mediate(int subtasks) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(subtasks * 2);
List<Callable<StringPair>> tasks = Lists.newArrayList();
Broker<String> broker = new Broker<String>();
for (int subtask = 0; subtask < subtasks; subtask++) {
tasks.add(new IterationHead(broker, subtask, "value" + subtask));
tasks.add(new IterationTail(broker, subtask));
}
Collections.shuffle(tasks);
int numSuccessfulHandovers = 0;
for (Future<StringPair> future : executorService.invokeAll(tasks)) {
StringPair stringPair = future.get();
if (stringPair != null) {
assertEquals("value" + stringPair.getFirst(), stringPair.getSecond());
numSuccessfulHandovers++;
}
}
assertEquals(subtasks, numSuccessfulHandovers);
}
class IterationHead implements Callable<StringPair> {
private final Random random;
private final Broker<String> broker;
private final String key;
private final String value;
IterationHead(Broker<String> broker, Integer key, String value) {
this.broker = broker;
this.key = String.valueOf(key);
this.value = value;
random = new Random();
}
@Override
public StringPair call() throws Exception {
Thread.sleep(random.nextInt(10));
System.out.println("Head " + key + " hands in " + value);
broker.handIn(key, value);
Thread.sleep(random.nextInt(10));
return null;
}
}
class IterationTail implements Callable<StringPair> {
private final Random random;
private final Broker<String> broker;
private final String key;
IterationTail(Broker<String> broker, Integer key) {
this.broker = broker;
this.key = String.valueOf(key);
random = new Random();
}
@Override
public StringPair call() throws Exception {
Thread.sleep(random.nextInt(10));
System.out.println("Tail " + key + " asks for handover");
String value = broker.get(key);
System.out.println("Tail " + key + " received " + value);
Preconditions.checkNotNull(value);
return new StringPair(key, value);
}
}
void mediate(int subtasks) throws InterruptedException, ExecutionException {
ExecutorService executorService = Executors.newFixedThreadPool(subtasks * 2);
List<Callable<StringPair>> tasks = Lists.newArrayList();
Broker<String> broker = new Broker<String>();
for (int subtask = 0; subtask < subtasks; subtask++) {
tasks.add(new IterationHead(broker, subtask, "value" + subtask));
tasks.add(new IterationTail(broker, subtask));
}
Collections.shuffle(tasks);
int numSuccessfulHandovers = 0;
for (Future<StringPair> future : executorService.invokeAll(tasks)) {
StringPair stringPair = future.get();
if (stringPair != null) {
assertEquals("value" + stringPair.getFirst(), stringPair.getSecond());
numSuccessfulHandovers++;
}
}
assertEquals(subtasks, numSuccessfulHandovers);
}
class IterationHead implements Callable<StringPair> {
private final Random random;
private final Broker<String> broker;
private final String key;
private final String value;
IterationHead(Broker<String> broker, Integer key, String value) {
this.broker = broker;
this.key = String.valueOf(key);
this.value = value;
random = new Random();
}
@Override
public StringPair call() throws Exception {
Thread.sleep(random.nextInt(10));
// System.out.println("Head " + key + " hands in " + value);
broker.handIn(key, value);
Thread.sleep(random.nextInt(10));
return null;
}
}
class IterationTail implements Callable<StringPair> {
private final Random random;
private final Broker<String> broker;
private final String key;
IterationTail(Broker<String> broker, Integer key) {
this.broker = broker;
this.key = String.valueOf(key);
random = new Random();
}
@Override
public StringPair call() throws Exception {
Thread.sleep(random.nextInt(10));
// System.out.println("Tail " + key + " asks for handover");
String value = broker.get(key);
// System.out.println("Tail " + key + " received " + value);
Preconditions.checkNotNull(value);
return new StringPair(key, value);
}
}
} }
...@@ -17,19 +17,20 @@ package eu.stratosphere.pact.runtime.iterative.concurrent; ...@@ -17,19 +17,20 @@ package eu.stratosphere.pact.runtime.iterative.concurrent;
class StringPair { class StringPair {
private final String first; private final String first;
private final String second;
StringPair(String first, String second) { private final String second;
this.first = first;
this.second = second;
}
public String getFirst() { StringPair(String first, String second) {
return first; this.first = first;
} this.second = second;
}
public String getSecond() { public String getFirst() {
return second; return first;
} }
public String getSecond() {
return second;
}
} }
\ No newline at end of file
...@@ -27,107 +27,111 @@ import static org.junit.Assert.assertTrue; ...@@ -27,107 +27,111 @@ import static org.junit.Assert.assertTrue;
public class SuperstepBarrierTest { public class SuperstepBarrierTest {
@Test @Test
public void syncEndOfSuperstep() throws InterruptedException { public void syncEndOfSuperstep() throws InterruptedException {
for (int n = 0; n < 20; n++) { for (int n = 0; n < 20; n++) {
sync(new EndOfSuperstepEvent()); sync(new EndOfSuperstepEvent());
} }
} }
@Test @Test
public void syncTermination() throws InterruptedException { public void syncTermination() throws InterruptedException {
for (int n = 0; n < 20; n++) { for (int n = 0; n < 20; n++) {
sync(new TerminationEvent()); sync(new TerminationEvent());
} }
} }
private void sync(AbstractTaskEvent event) throws InterruptedException { private void sync(AbstractTaskEvent event) throws InterruptedException {
TerminationSignaled terminationSignaled = new TerminationSignaled(); TerminationSignaled terminationSignaled = new TerminationSignaled();
SuperstepBarrier barrier = new SuperstepBarrier(); SuperstepBarrier barrier = new SuperstepBarrier();
barrier.setup(); barrier.setup();
Thread headThread = new Thread(new IterationHead(barrier, terminationSignaled)); Thread headThread = new Thread(new IterationHead(barrier, terminationSignaled));
Thread syncThread = new Thread(new IterationSync(barrier, event)); Thread syncThread = new Thread(new IterationSync(barrier, event));
headThread.start(); headThread.start();
syncThread.start(); syncThread.start();
headThread.join(); headThread.join();
syncThread.join(); syncThread.join();
if (event instanceof TerminationEvent) { if (event instanceof TerminationEvent) {
assertTrue(terminationSignaled.isTerminationSignaled()); assertTrue(terminationSignaled.isTerminationSignaled());
} else { } else {
assertFalse(terminationSignaled.isTerminationSignaled()); assertFalse(terminationSignaled.isTerminationSignaled());
} }
} }
class IterationHead implements Runnable { class IterationHead implements Runnable {
private final SuperstepBarrier barrier; private final SuperstepBarrier barrier;
private final TerminationSignaled terminationSignaled;
private final Random random;
IterationHead(SuperstepBarrier barrier, TerminationSignaled terminationSignaled) { private final TerminationSignaled terminationSignaled;
this.barrier = barrier;
this.terminationSignaled = terminationSignaled;
random = new Random();
}
@Override private final Random random;
public void run() {
try {
Thread.sleep(random.nextInt(10));
barrier.waitForOtherWorkers(); IterationHead(SuperstepBarrier barrier, TerminationSignaled terminationSignaled) {
this.barrier = barrier;
this.terminationSignaled = terminationSignaled;
random = new Random();
}
if (barrier.terminationSignaled()) { @Override
terminationSignaled.setTerminationSignaled(); public void run() {
} try {
Thread.sleep(random.nextInt(10));
} catch (Exception e) { barrier.waitForOtherWorkers();
throw new RuntimeException(e);
}
}
}
class IterationSync implements Runnable { if (barrier.terminationSignaled()) {
terminationSignaled.setTerminationSignaled();
}
private final SuperstepBarrier barrier; } catch (Exception e) {
private final AbstractTaskEvent event; throw new RuntimeException(e);
private final Random random; }
}
}
IterationSync(SuperstepBarrier barrier, AbstractTaskEvent event) { class IterationSync implements Runnable {
this.barrier = barrier;
this.event = event;
random = new Random();
}
@Override private final SuperstepBarrier barrier;
public void run() {
try {
Thread.sleep(random.nextInt(10));
barrier.eventOccurred(event); private final AbstractTaskEvent event;
} catch (Exception e) { private final Random random;
throw new RuntimeException(e);
}
}
}
class TerminationSignaled { IterationSync(SuperstepBarrier barrier, AbstractTaskEvent event) {
this.barrier = barrier;
this.event = event;
random = new Random();
}
private volatile boolean terminationSignaled; @Override
public void run() {
try {
Thread.sleep(random.nextInt(10));
public boolean isTerminationSignaled() { barrier.eventOccurred(event);
return terminationSignaled;
}
public void setTerminationSignaled() { } catch (Exception e) {
terminationSignaled = true; throw new RuntimeException(e);
} }
} }
}
class TerminationSignaled {
private volatile boolean terminationSignaled;
public boolean isTerminationSignaled() {
return terminationSignaled;
}
public void setTerminationSignaled() {
terminationSignaled = true;
}
}
} }
...@@ -46,7 +46,8 @@ import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode; ...@@ -46,7 +46,8 @@ import eu.stratosphere.pact.runtime.test.util.TestData.Generator.KeyMode;
import eu.stratosphere.pact.runtime.test.util.TestData.Generator.ValueMode; import eu.stratosphere.pact.runtime.test.util.TestData.Generator.ValueMode;
public class HashVsSortTest { public class HashVsSortMiniBenchmark {
// total memory // total memory
private static final int MEMORY_SIZE = 1024 * 1024 * 32; private static final int MEMORY_SIZE = 1024 * 1024 * 32;
...@@ -83,8 +84,7 @@ public class HashVsSortTest { ...@@ -83,8 +84,7 @@ public class HashVsSortTest {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Before @Before
public void beforeTest() public void beforeTest() {
{
this.serializer1 = PactRecordSerializer.get(); this.serializer1 = PactRecordSerializer.get();
this.serializer2 = PactRecordSerializer.get(); this.serializer2 = PactRecordSerializer.get();
this.comparator1 = new PactRecordComparator(new int[] {0}, new Class[] {TestData.Key.class}); this.comparator1 = new PactRecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
...@@ -96,8 +96,7 @@ public class HashVsSortTest { ...@@ -96,8 +96,7 @@ public class HashVsSortTest {
} }
@After @After
public void afterTest() public void afterTest() {
{
if (this.memoryManager != null) { if (this.memoryManager != null) {
Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
this.memoryManager.verifyEmpty()); this.memoryManager.verifyEmpty());
...@@ -244,10 +243,8 @@ public class HashVsSortTest { ...@@ -244,10 +243,8 @@ public class HashVsSortTest {
} }
private static final class NoOpMatcher extends MatchStub private static final class NoOpMatcher extends MatchStub {
{
@Override @Override
public void match(PactRecord rec1, PactRecord rec2, Collector<PactRecord> out) {} public void match(PactRecord rec1, PactRecord rec2, Collector<PactRecord> out) {}
} }
} }
...@@ -42,13 +42,12 @@ import eu.stratosphere.pact.runtime.plugable.pactrecord.PactRecordComparator; ...@@ -42,13 +42,12 @@ import eu.stratosphere.pact.runtime.plugable.pactrecord.PactRecordComparator;
import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter; import eu.stratosphere.pact.runtime.shipping.PactRecordOutputEmitter;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
public class OutputEmitterTest extends TestCase public class OutputEmitterTest extends TestCase {
{
private static final long SEED = 485213591485399L; private static final long SEED = 485213591485399L;
@Test @Test
public void testPartitionHash() public void testPartitionHash() {
{
// Test for PactInteger // Test for PactInteger
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class});
...@@ -105,8 +104,7 @@ public class OutputEmitterTest extends TestCase ...@@ -105,8 +104,7 @@ public class OutputEmitterTest extends TestCase
} }
@Test @Test
public void testForward() public void testForward() {
{
// Test for PactInteger // Test for PactInteger
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class});
...@@ -164,8 +162,7 @@ public class OutputEmitterTest extends TestCase ...@@ -164,8 +162,7 @@ public class OutputEmitterTest extends TestCase
} }
@Test @Test
public void testBroadcast() public void testBroadcast() {
{
// Test for PactInteger // Test for PactInteger
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class});
...@@ -216,8 +213,7 @@ public class OutputEmitterTest extends TestCase ...@@ -216,8 +213,7 @@ public class OutputEmitterTest extends TestCase
} }
@Test @Test
public void testMultiKeys() public void testMultiKeys() {
{
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final PactRecordComparator multiComp = new PactRecordComparator(new int[] {0,1,3}, new Class[] {PactInteger.class, PactString.class, PactDouble.class}); final PactRecordComparator multiComp = new PactRecordComparator(new int[] {0,1,3}, new Class[] {PactInteger.class, PactString.class, PactDouble.class});
final ChannelSelector<PactRecord> oe1 = new PactRecordOutputEmitter(ShipStrategyType.PARTITION_HASH, multiComp); final ChannelSelector<PactRecord> oe1 = new PactRecordOutputEmitter(ShipStrategyType.PARTITION_HASH, multiComp);
...@@ -249,8 +245,7 @@ public class OutputEmitterTest extends TestCase ...@@ -249,8 +245,7 @@ public class OutputEmitterTest extends TestCase
} }
@Test @Test
public void testMissingKey() public void testMissingKey() {
{
// Test for PactInteger // Test for PactInteger
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final PactRecordComparator intComp = new PactRecordComparator(new int[] {1}, new Class[] {PactInteger.class}); final PactRecordComparator intComp = new PactRecordComparator(new int[] {1}, new Class[] {PactInteger.class});
...@@ -269,8 +264,7 @@ public class OutputEmitterTest extends TestCase ...@@ -269,8 +264,7 @@ public class OutputEmitterTest extends TestCase
} }
@Test @Test
public void testNullKey() public void testNullKey() {
{
// Test for PactInteger // Test for PactInteger
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class}); final PactRecordComparator intComp = new PactRecordComparator(new int[] {0}, new Class[] {PactInteger.class});
...@@ -324,8 +318,7 @@ public class OutputEmitterTest extends TestCase ...@@ -324,8 +318,7 @@ public class OutputEmitterTest extends TestCase
} }
@Test @Test
public void testPartitionRange() public void testPartitionRange() {
{
final Random rnd = new Random(SEED); final Random rnd = new Random(SEED);
final int DISTR_MIN = 0; final int DISTR_MIN = 0;
...@@ -370,28 +363,30 @@ public class OutputEmitterTest extends TestCase ...@@ -370,28 +363,30 @@ public class OutputEmitterTest extends TestCase
} }
} }
private static final class IntegerUniformDistribution implements DataDistribution private static final class IntegerUniformDistribution implements DataDistribution {
{
private int min; private int min;
private int max; private int max;
public IntegerUniformDistribution(int min, int max) public IntegerUniformDistribution(int min, int max) {
{
this.min = min; this.min = min;
this.max = max; this.max = max;
} }
@Override @Override
public void write(DataOutput out) public void write(DataOutput out) throws IOException {
{} out.writeInt(this.min);
out.writeInt(this.max);
}
@Override @Override
public void read(DataInput in) public void read(DataInput in) throws IOException {
{} this.min = in.readInt();
this.max = in.readInt();
}
@Override @Override
public PactRecord getBucketBoundary(int splitNum, int totalSplits) public PactRecord getBucketBoundary(int splitNum, int totalSplits) {
{
final int range = this.max - this.min + 1; final int range = this.max - this.min + 1;
final float bucketWidth = ((float) range) / totalSplits; final float bucketWidth = ((float) range) / totalSplits;
final int upperBoundary = this.min + (int) ((splitNum + 1) * bucketWidth); final int upperBoundary = this.min + (int) ((splitNum + 1) * bucketWidth);
......
...@@ -160,28 +160,29 @@ ...@@ -160,28 +160,29 @@
</configuration> </configuration>
</plugin> </plugin>
<plugin> <!-- measure and report source code complexity -->
<!-- measure and report source code complexity --> <!-- <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>javancss-maven-plugin</artifactId> <artifactId>javancss-maven-plugin</artifactId>
<version>2.0</version> <version>2.0</version>
</plugin> </plugin> -->
<plugin> <!-- analyze dependencies in source code -->
<!-- analyze dependencies in source code --> <!-- <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>jdepend-maven-plugin</artifactId> <artifactId>jdepend-maven-plugin</artifactId>
</plugin> <version></version>
</plugin> -->
<!-- disabled because currently no SCM defined generates changelog <plugin> <!-- disabled because currently no SCM defined generates changelog <plugin>
<groupId>org.apache.maven.plugins</groupId> <artifactId>maven-changelog-plugin</artifactId> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-changelog-plugin</artifactId>
<version>2.2-SNAPSHOT</version> </plugin> --> <version>2.2-SNAPSHOT</version> </plugin> -->
<plugin> <!-- report occurrences of various todo markers in code -->
<!-- report occurrences of various todo markers in code --> <!-- <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>taglist-maven-plugin</artifactId> <artifactId>taglist-maven-plugin</artifactId>
<!-- <version>2.3</version> --> <version>2.3</version>
<configuration> <configuration>
<tags> <tags>
<tag>TODO</tag> <tag>TODO</tag>
...@@ -190,7 +191,7 @@ ...@@ -190,7 +191,7 @@
<tag>@deprecated</tag> <tag>@deprecated</tag>
</tags> </tags>
</configuration> </configuration>
</plugin> </plugin> -->
<!-- todo: reenable when SCM is available <plugin> <groupId>org.codehaus.mojo</groupId> <!-- todo: reenable when SCM is available <plugin> <groupId>org.codehaus.mojo</groupId>
<artifactId>scmchangelog-maven-plugin</artifactId> <version>1.2</version> <artifactId>scmchangelog-maven-plugin</artifactId> <version>1.2</version>
...@@ -198,33 +199,32 @@ ...@@ -198,33 +199,32 @@
<tagBase>https://projekte.itmc.tu-dortmund.de/svn/sla4dgrid/tags/</tagBase> <tagBase>https://projekte.itmc.tu-dortmund.de/svn/sla4dgrid/tags/</tagBase>
<filter>.*</filter> </configuration> </plugin> --> <filter>.*</filter> </configuration> </plugin> -->
<plugin> <!-- generates cross references in code so that you can click in the reports and jump to the respective lines -->
<!-- generates cross references in code so that you can click in the <!-- <plugin>
reports and jump to the respective lines -->
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>jxr-maven-plugin</artifactId> <artifactId>jxr-maven-plugin</artifactId>
<configuration> <configuration>
<linkJavadoc>true</linkJavadoc> <linkJavadoc>true</linkJavadoc>
</configuration> </configuration>
</plugin> </plugin> -->
<plugin> <!-- discovers frequent bugs in programs -->
<!-- discovers frequent bugs in programs --> <!-- <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId> <artifactId>findbugs-maven-plugin</artifactId>
<!-- <version>2.0.1</version> --> <version>2.0.1</version>
<configuration> <configuration>
<effort>Max</effort> <effort>Max</effort>
<threshold>Medium</threshold> <threshold>Medium</threshold>
<findbugsXmlOutput>true</findbugsXmlOutput> <findbugsXmlOutput>true</findbugsXmlOutput>
<xmlOutput>true</xmlOutput> <xmlOutput>true</xmlOutput>
</configuration> </configuration>
</plugin> </plugin> -->
<plugin> <!-- maven source code analysis for frequent bugs -->
<!-- maven source code analysis for frequent bugs --> <!-- <plugin>
<artifactId>maven-pmd-plugin</artifactId> <artifactId>maven-pmd-plugin</artifactId>
<!-- <version>2.5</version> --> <version>2.5</version>
<configuration> <configuration>
<targetJdk>1.6</targetJdk> <targetJdk>1.6</targetJdk>
</configuration> </configuration>
...@@ -236,18 +236,17 @@ ...@@ -236,18 +236,17 @@
</reports> </reports>
</reportSet> </reportSet>
</reportSets> </reportSets>
</plugin> </plugin> -->
<!-- TEST ME --> <!-- generation of JavaDoc -->
<plugin> <plugin>
<!-- generation of JavaDoc -->
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId> <artifactId>maven-javadoc-plugin</artifactId>
<version>2.5</version> <version>2.5</version>
</plugin> </plugin>
<!-- style checker -->
<plugin> <plugin>
<!-- style checker -->
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId> <artifactId>maven-checkstyle-plugin</artifactId>
<version>2.6</version> <version>2.6</version>
...@@ -256,15 +255,15 @@ ...@@ -256,15 +255,15 @@
</configuration> </configuration>
</plugin> </plugin>
<!-- execution of Unit Tests -->
<plugin> <plugin>
<!-- execution of Unit Tests -->
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-report-plugin</artifactId> <artifactId>maven-surefire-report-plugin</artifactId>
<version>2.7</version> <version>2.7</version>
</plugin> </plugin>
<plugin> <!-- check coverage of tests -->
<!-- check coverage of tests --> <!-- <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>cobertura-maven-plugin</artifactId> <artifactId>cobertura-maven-plugin</artifactId>
<configuration> <configuration>
...@@ -273,7 +272,7 @@ ...@@ -273,7 +272,7 @@
<format>xml</format> <format>xml</format>
</formats> </formats>
</configuration> </configuration>
</plugin> </plugin> -->
<!-- Generator for QA reports, summarizes various inputs and draws diagrams <!-- Generator for QA reports, summarizes various inputs and draws diagrams
indicating improvements/deterioration <plugin> <groupId>net.objectlab</groupId> indicating improvements/deterioration <plugin> <groupId>net.objectlab</groupId>
...@@ -282,10 +281,10 @@ ...@@ -282,10 +281,10 @@
<report>report-movers-all</report> </reports> </reportSet> </reportSets> <report>report-movers-all</report> </reports> </reportSet> </reportSets>
</plugin> --> </plugin> -->
<plugin> <!-- <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>dashboard-maven-plugin</artifactId> <artifactId>dashboard-maven-plugin</artifactId>
</plugin> </plugin> -->
</plugins> </plugins>
</reporting> </reporting>
...@@ -409,7 +408,7 @@ ...@@ -409,7 +408,7 @@
<profiles> <profiles>
<!-- Hudson by default defines a property BUILD_NUMBER which is used to <!-- Hudson by default defines a property BUILD_NUMBER which is used to
enable the profile. --> enable the profile. -->
<profile> <!-- <profile>
<id>hudson</id> <id>hudson</id>
<activation> <activation>
<property> <property>
...@@ -463,8 +462,8 @@ ...@@ -463,8 +462,8 @@
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</profile> </profile> -->
<profile> <!-- <profile>
<id>nightly</id> <id>nightly</id>
<activation> <activation>
<property> <property>
...@@ -472,7 +471,7 @@ ...@@ -472,7 +471,7 @@
</property> </property>
</activation> </activation>
<build> <build>
<finalName>${artifactId}-${project.version}-nightly-${BUILD_NUMBER}</finalName> <finalName>${project.artifactId}-${project.version}-nightly-${BUILD_NUMBER}</finalName>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
...@@ -493,7 +492,7 @@ ...@@ -493,7 +492,7 @@
</plugins> </plugins>
</build> </build>
</profile> </profile>
</profiles> </profiles> -->
<modules> <modules>
<module>nephele</module> <module>nephele</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册