提交 e5ee55bd 编写于 作者: G gallenvara 提交者: Fabian Hueske

[FLINK-2853] Port MutableHashTablePerformanceBenchmark to JMH.

This closes #1267
上级 75a52574
......@@ -56,14 +56,21 @@ under the License.
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>0.10-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependency>
</dependencies>
<build>
......
......@@ -16,10 +16,11 @@
* limitations under the License.
*/
package org.apache.flink.runtime.operators.hash;
package org.apache.flink.benchmark.runtime.operators.hash;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
......@@ -30,6 +31,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.MutableHashTable;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.types.StringPair;
import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
......@@ -37,13 +39,17 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparat
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import static org.junit.Assert.fail;
@State(Scope.Thread)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class MutableHashTablePerformanceBenchmark {
private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
......@@ -59,8 +65,7 @@ public class MutableHashTablePerformanceBenchmark {
private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";
@Before
@Setup
public void setup() {
this.pairBuildSideAccesssor = new StringPairSerializer();
this.pairProbeSideAccesssor = new StringPairSerializer();
......@@ -72,7 +77,7 @@ public class MutableHashTablePerformanceBenchmark {
this.ioManager = new IOManagerAsync();
}
@After
@TearDown
public void tearDown() {
// shut down I/O manager and Memory Manager and verify the correct shutdown
this.ioManager.shutdown();
......@@ -84,8 +89,8 @@ public class MutableHashTablePerformanceBenchmark {
}
}
@Test
public void compareMutableHashTablePerformance1() throws IOException {
@Benchmark
public void compareMutableHashTableWithBloomFilter1() throws IOException {
// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
int buildSize = 1000000;
......@@ -98,19 +103,63 @@ public class MutableHashTablePerformanceBenchmark {
int expectedResult = 500000;
long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
System.out.println("HybridHashJoin1:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
}
@Benchmark
public void compareMutableHashTableWithoutBloomFilter1() throws IOException {
// ----------------------------------------------90% filtered during probe spill phase-----------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records.
int buildSize = 1000000;
int buildStep = 10;
int buildScope = buildStep * buildSize;
// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
int probeSize = 5000000;
int probeStep = 1;
int probeScope = buildSize;
int expectedResult = 500000;
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
System.out.println("HybridHashJoin1:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
}
@Benchmark
public void compareMutableHashTableWithBloomFilter2() throws IOException {
// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
int buildSize = 1000000;
int buildStep = 5;
int buildScope = buildStep * buildSize;
// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
int probeSize = 5000000;
int probeStep = 1;
int probeScope = buildSize;
int expectedResult = 1000000;
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
System.out.println("HybridHashJoin2:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
}
@Test
public void compareMutableHashTablePerformance2() throws IOException {
@Benchmark
public void compareMutableHashTableWithoutBloomFilter2() throws IOException {
// ----------------------------------------------80% filtered during probe spill phase-----------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records.
int buildSize = 1000000;
......@@ -123,19 +172,40 @@ public class MutableHashTablePerformanceBenchmark {
int expectedResult = 1000000;
long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
System.out.println("HybridHashJoin2:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
}
@Benchmark
public void compareMutableHashTableWithBloomFilter3() throws IOException {
// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
int buildSize = 1000000;
int buildStep = 2;
int buildScope = buildStep * buildSize;
// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
int probeSize = 5000000;
int probeStep = 1;
int probeScope = buildSize;
int expectedResult = 2500000;
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
System.out.println("HybridHashJoin3:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
}
@Test
public void compareMutableHashTablePerformance3() throws IOException {
@Benchmark
public void compareMutableHashTableWithoutBloomFilter3() throws IOException {
// ----------------------------------------------50% filtered during probe spill phase-------------------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records.
int buildSize = 1000000;
......@@ -148,19 +218,40 @@ public class MutableHashTablePerformanceBenchmark {
int expectedResult = 2500000;
long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
System.out.println("HybridHashJoin3:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
}
@Benchmark
public void compareMutableHashTableWithBloomFilter4() throws IOException {
// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
int buildSize = 1000000;
int buildStep = 1;
int buildScope = buildStep * buildSize;
// create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records.
int probeSize = 5000000;
int probeStep = 1;
int probeScope = buildSize;
int expectedResult = probeSize / buildStep;
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
System.out.println("HybridHashJoin4:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
}
@Test
public void compareMutableHashTablePerformance4() throws IOException {
@Benchmark
public void compareMutableHashTableWithoutBloomFilter4() throws IOException {
// ----------------------------------------------0% filtered during probe spill phase-----------------------------------------
// create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records.
int buildSize = 1000000;
......@@ -173,15 +264,13 @@ public class MutableHashTablePerformanceBenchmark {
int expectedResult = probeSize / buildStep;
long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
System.out.println("HybridHashJoin5:");
System.out.println("HybridHashJoin4:");
System.out.println("Build input size: " + 100 * buildSize);
System.out.println("Probe input size: " + 100 * probeSize);
System.out.println("Available memory: " + this.memManager.getMemorySize());
System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent.");
System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
}
private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize,
......@@ -202,7 +291,6 @@ public class MutableHashTablePerformanceBenchmark {
// ----------------------------------------------------------------------------------------
long start = System.currentTimeMillis();
final MutableHashTable<StringPair, StringPair> join = new MutableHashTable<StringPair, StringPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
......@@ -221,11 +309,10 @@ public class MutableHashTablePerformanceBenchmark {
Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult);
join.close();
long cost = System.currentTimeMillis() - start;
// ----------------------------------------------------------------------------------------
this.memManager.release(join.getFreedMemory());
return cost;
return 1;
}
......@@ -259,4 +346,14 @@ public class MutableHashTablePerformanceBenchmark {
return next(new StringPair());
}
}
public static void main(String[] args) throws Exception {
Options opt = new OptionsBuilder()
.include(MutableHashTablePerformanceBenchmark.class.getSimpleName())
.warmupIterations(2)
.measurementIterations(2)
.forks(1)
.build();
new Runner(opt).run();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册