提交 c9589b82 编写于 作者: F Fabian Hueske

Fixed RecordOutputEmitter and extended OutputEmitterTest for hash corner cases

上级 8677ad9a
......@@ -147,7 +147,16 @@ public class RecordOutputEmitter implements ChannelSelector<Record> {
for (int i = 0; i < DEFAULT_SALT.length; i++) {
hash ^= ((hash << 5) + DEFAULT_SALT[i] + (hash >> 2));
}
this.channels[0] = (hash < 0) ? -hash % numberOfChannels : hash % numberOfChannels;
if(hash < 0) {
if(hash == Integer.MIN_VALUE) {
this.channels[0] = Integer.MAX_VALUE % numberOfChannels;
} else {
this.channels[0] = -hash % numberOfChannels;
}
} else {
this.channels[0] = hash % numberOfChannels;
}
return this.channels;
}
......
......@@ -22,13 +22,18 @@ import java.io.PipedOutputStream;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.commons.lang.NotImplementedException;
import org.junit.Test;
import eu.stratosphere.api.common.typeutils.TypeComparator;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.api.common.typeutils.base.IntSerializer;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparatorFactory;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
import eu.stratosphere.core.memory.DataInputView;
import eu.stratosphere.core.memory.DataOutputView;
import eu.stratosphere.core.memory.MemorySegment;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
import eu.stratosphere.pact.runtime.shipping.OutputEmitter;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.types.DeserializationException;
......@@ -102,6 +107,42 @@ public class OutputEmitterTest extends TestCase {
}
assertTrue(cnt == numRecs);
// test hash corner cases
final TestIntComparator testIntComp = new TestIntComparator();
final ChannelSelector<SerializationDelegate<Integer>> oe3 = new OutputEmitter<Integer>(ShipStrategyType.PARTITION_HASH, testIntComp);
final SerializationDelegate<Integer> intDel = new SerializationDelegate<Integer>(new IntSerializer());
numChans = 100;
// MinVal hash
intDel.setInstance(Integer.MIN_VALUE);
int[] chans = oe3.selectChannels(intDel, numChans);
assertTrue(chans.length == 1);
assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
// -1 hash
intDel.setInstance(-1);
chans = oe3.selectChannels(intDel, hit.length);
assertTrue(chans.length == 1);
assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
// 0 hash
intDel.setInstance(0);
chans = oe3.selectChannels(intDel, hit.length);
assertTrue(chans.length == 1);
assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
// 1 hash
intDel.setInstance(1);
chans = oe3.selectChannels(intDel, hit.length);
assertTrue(chans.length == 1);
assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
// MaxVal hash
intDel.setInstance(Integer.MAX_VALUE);
chans = oe3.selectChannels(intDel, hit.length);
assertTrue(chans.length == 1);
assertTrue(chans[0] >= 0 && chans[0] <= numChans-1);
}
@Test
......@@ -332,6 +373,68 @@ public class OutputEmitterTest extends TestCase {
Assert.fail("Expected a NullKeyFieldException.");
}
@SuppressWarnings("serial")
private static class TestIntComparator extends TypeComparator<Integer> {
@Override
public int hash(Integer record) {
return record;
}
@Override
public void setReference(Integer toCompare) { throw new NotImplementedException(); }
@Override
public boolean equalToReference(Integer candidate) { throw new NotImplementedException(); }
@Override
public int compareToReference( TypeComparator<Integer> referencedComparator) {
throw new NotImplementedException();
}
@Override
public int compare(Integer first, Integer second) { throw new NotImplementedException(); }
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) {
throw new NotImplementedException();
}
@Override
public boolean supportsNormalizedKey() { throw new NotImplementedException(); }
@Override
public boolean supportsSerializationWithKeyNormalization() { throw new NotImplementedException(); }
@Override
public int getNormalizeKeyLen() { throw new NotImplementedException(); }
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) { throw new NotImplementedException(); }
@Override
public void putNormalizedKey(Integer record, MemorySegment target, int offset, int numBytes) {
throw new NotImplementedException();
}
@Override
public void writeWithKeyNormalization(Integer record, DataOutputView target) throws IOException {
throw new NotImplementedException();
}
@Override
public Integer readWithKeyDenormalization(Integer reuse, DataInputView source) throws IOException {
throw new NotImplementedException();
}
@Override
public boolean invertNormalizedKey() { throw new NotImplementedException(); }
@Override
public TypeComparator<Integer> duplicate() { throw new NotImplementedException(); }
}
// @Test
// public void testPartitionRange() {
// final Random rnd = new Random(SEED);
......@@ -379,4 +482,5 @@ public class OutputEmitterTest extends TestCase {
//
// }
// }
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册