diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 76618082dde79869d24d30de643f88d60d433b04..2ad01aae46e20d9fce5bb9fa1767f9d7f049650a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -1120,9 +1120,14 @@ public class MutableHashTable implements MemorySegmentSource { final protected void buildBloomFilterForBucketsInPartition(int partNum, HashPartition partition) { // Find all the buckets which belongs to this partition, and build bloom filter for each bucket(include its overflow buckets). final int bucketsPerSegment = this.bucketsPerSegmentMask + 1; - for (MemorySegment segment : this.buckets) { - for (int i = 0; i < bucketsPerSegment; i++) { - final int bucketInSegmentOffset = i * HASH_BUCKET_SIZE; + + int numSegs = this.buckets.length; + // go over all segments that are part of the table + for (int i = 0, bucket = 0; i < numSegs && bucket < numBuckets; i++) { + final MemorySegment segment = this.buckets[i]; + // go over all buckets in the segment + for (int k = 0; k < bucketsPerSegment && bucket < numBuckets; k++, bucket++) { + final int bucketInSegmentOffset = k * HASH_BUCKET_SIZE; byte partitionNumber = segment.get(bucketInSegmentOffset + HEADER_PARTITION_OFFSET); if (partitionNumber == partNum) { byte status = segment.get(bucketInSegmentOffset + HEADER_STATUS_OFFSET); @@ -1140,6 +1145,10 @@ public class MutableHashTable implements MemorySegmentSource { */ final void buildBloomFilterForBucket(int bucketInSegmentPos, MemorySegment bucket, HashPartition p) { final int count = bucket.getShort(bucketInSegmentPos + HEADER_COUNT_OFFSET); + if (count <= 0) { + return; + } + int[] hashCodes = new int[count]; // As the hashcode and bloom filter occupy same bytes, so we read all hashcode out at first and then write back to bloom filter. for (int i = 0; i < count; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index 233fa4dc222014d410c4053b9c73ac9ea018084e..52f6ffc8c88ce5c3c658fab06e5474c0c24b30c8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -1476,6 +1476,65 @@ public class HashTableITCase { this.memManager.release(join.getFreedMemory()); } + + @Test + public void testBucketsNotFulfillSegment() throws Exception { + final int NUM_KEYS = 10000; + final int BUILD_VALS_PER_KEY = 3; + final int PROBE_VALS_PER_KEY = 10; + + // create a build input that gives 30000 pairs with 3 values sharing the same key + MutableObjectIterator buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 100000 pairs with 10 values sharing a key + MutableObjectIterator probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true); + + // allocate the memory for the HashTable + List memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, 33); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // For FLINK-2545, the buckets data may not fulfill it's buffer, for example, the buffer may contains 256 buckets, + // while hash table only assign 250 bucket on it. The unused buffer bytes may contains arbitrary data, which may + // influence hash table if forget to skip it. To mock this, put the invalid bucket data(partition=1, inMemory=true, count=-1) + // at the end of buffer. + for (MemorySegment segment : memSegments) { + int newBucketOffset = segment.size() - 128; + // initialize the header fields + segment.put(newBucketOffset + 0, (byte)0); + segment.put(newBucketOffset + 1, (byte)0); + segment.putShort(newBucketOffset + 2, (short) -1); + segment.putLong(newBucketOffset + 4, ~0x0L); + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable join = new MutableHashTable( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager); + join.open(buildInput, probeInput); + + final IntPair recordReuse = new IntPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + HashBucketIterator buildSide = join.getBuildSideIterator(); + while (buildSide.next(recordReuse) != null) { + numRecordsInJoinResult++; + } + } + Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult); + + join.close(); + this.memManager.release(join.getFreedMemory()); + } // ============================================================================================