提交 641a0d43 编写于 作者: G Greg Hogan 提交者: Stephan Ewen

[FLINK-3623] [runtime] Adjust MurmurHash Algorithm

Fix "hash *= n" to be "hash = hash * m + n".

This closes #1806
上级 6593e482
......@@ -49,9 +49,9 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory("3> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" +
"3> 1431162155\n" + "4> -1557280266\n" + "4> -1728529858\n" + "4> 1654374947\n" +
"4> -65105105\n" + "4> -518907128\n" + "4> -252332814\n", this.resultPath);
compareResultsByLinesInMemory("4> -1155484576\n" + "3> 1033096058\n" + "3> -1930858313\n" +
"4> 1431162155\n" + "3> -1557280266\n" + "4> -1728529858\n" + "3> 1654374947\n" +
"3> -65105105\n" + "3> -518907128\n" + "4> -252332814\n", this.resultPath);
}
@Override
......
......@@ -147,7 +147,7 @@ public final class MathUtils {
code *= 0x1b873593;
code = Integer.rotateLeft(code, 13);
code *= 0xe6546b64;
code = code * 5 + 0xe6546b64;
code ^= 4;
code ^= code >>> 16;
......
......@@ -222,7 +222,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
Tuple2<Integer, Integer> result = new Tuple2<>(MathUtils.murmurHash(i) % numKeys, i);
// keys '1' and '2' hash to different buckets
Tuple2<Integer, Integer> result = new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
ctx.collect(result);
}
}
......
......@@ -74,7 +74,8 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
override def run(ctx: SourceContext[(Int, Int)]): Unit = {
0 until numElements foreach {
i => ctx.collect((MathUtils.murmurHash(i) % numKeys, i))
// keys '1' and '2' hash to different buckets
i => ctx.collect((1 + (MathUtils.murmurHash(i)) % numKeys, i))
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册