提交 925ac1f7 编写于 作者: S Stephan Ewen

[FLINK-1916] [FLINK-2361] [runtime] Fix EOFException and entry loss in CompactingHashTable

Also a lot of code cleanups in CompactingHashTable
上级 5226f0b4
......@@ -20,9 +20,6 @@ package org.apache.flink.api.common.typeutils;
import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
public class GenericPairComparator<T1, T2> extends TypePairComparator<T1, T2>
implements Serializable {
......
......@@ -20,7 +20,6 @@ package org.apache.flink.runtime.iterative.io;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
......@@ -39,22 +38,19 @@ public class SolutionSetFastUpdateOutputCollector<T> implements Collector<T> {
private final CompactingHashTable<T> solutionSet;
private final T tmpHolder;
public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
this(solutionSet, serializer, null);
public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
this(solutionSet, null);
}
public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
public SolutionSetFastUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
this.solutionSet = solutionSet;
this.delegate = delegate;
this.tmpHolder = serializer.createInstance();
}
@Override
public void collect(T record) {
try {
solutionSet.insertOrReplaceRecord(record, tmpHolder);
solutionSet.insertOrReplaceRecord(record);
if (delegate != null) {
delegate.collect(record);
}
......
......@@ -16,12 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.runtime.iterative.io;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.util.Collector;
......@@ -41,22 +39,19 @@ public class SolutionSetUpdateOutputCollector<T> implements Collector<T> {
private final CompactingHashTable<T> solutionSet;
private final T tmpHolder;
public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer) {
this(solutionSet, serializer, null);
public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet) {
this(solutionSet, null);
}
public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, TypeSerializer<T> serializer, Collector<T> delegate) {
public SolutionSetUpdateOutputCollector(CompactingHashTable<T> solutionSet, Collector<T> delegate) {
this.solutionSet = solutionSet;
this.delegate = delegate;
this.tmpHolder = serializer.createInstance();
}
@Override
public void collect(T record) {
try {
solutionSet.insertOrReplaceRecord(record, tmpHolder);
solutionSet.insertOrReplaceRecord(record);
if (delegate != null) {
delegate.collect(record);
}
......
......@@ -329,8 +329,7 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
if (ss instanceof CompactingHashTable) {
@SuppressWarnings("unchecked")
CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
TypeSerializer<OT> serializer = getOutputSerializer();
return new SolutionSetUpdateOutputCollector<OT>(solutionSet, serializer, delegate);
return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
}
else if (ss instanceof JoinHashMap) {
@SuppressWarnings("unchecked")
......
......@@ -61,15 +61,13 @@ public abstract class AbstractMutableHashTable<T> {
public abstract void abort();
public abstract void buildTable(final MutableObjectIterator<T> input) throws IOException;
public abstract List<MemorySegment> getFreeMemory();
// ------------- Modifier -------------
public abstract void insert(T record) throws IOException;
public abstract void insertOrReplaceRecord(T record, T tempHolder) throws IOException;
public abstract void insertOrReplaceRecord(T record) throws IOException;
// ------------- Accessors -------------
......
......@@ -199,7 +199,7 @@ public class InMemoryPartition<T> {
*
* @param compacted compaction status
*/
public void setCompaction(boolean compacted) {
public void setIsCompacted(boolean compacted) {
this.compacted = compacted;
}
......@@ -281,9 +281,9 @@ public class InMemoryPartition<T> {
* @param numberOfSegments allocation count
*/
public void allocateSegments(int numberOfSegments) {
while(getBlockCount() < numberOfSegments) {
while (getBlockCount() < numberOfSegments) {
MemorySegment next = this.availableMemory.nextSegment();
if(next != null) {
if (next != null) {
this.partitionPages.add(next);
} else {
return;
......
......@@ -886,7 +886,7 @@ public class MutableHashTable<BT, PT> implements MemorySegmentSource {
// forward pointer set
final int overflowSegNum = (int) (originalForwardPointer >>> 32);
final int segOffset = (int) (originalForwardPointer & 0xffffffff);
final int segOffset = (int) originalForwardPointer;
final MemorySegment seg = p.overflowSegments[overflowSegNum];
final short obCount = seg.getShort(segOffset + HEADER_COUNT_OFFSET);
......
......@@ -71,10 +71,12 @@ public class IntArrayList {
public static final IntArrayList EMPTY = new IntArrayList(0) {
@Override
public boolean add(int number) {
throw new UnsupportedOperationException();
}
@Override
public int removeLast() {
throw new UnsupportedOperationException();
};
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.operators.hash;
import static org.junit.Assert.*;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
public class CompactingHashTableTest {
private final TypeSerializer<Tuple2<Long, String>> serializer;
private final TypeComparator<Tuple2<Long, String>> comparator;
private final TypeComparator<Long> probeComparator;
private final TypePairComparator<Long, Tuple2<Long, String>> pairComparator;
public CompactingHashTableTest() {
TypeSerializer<?>[] fieldSerializers = { LongSerializer.INSTANCE, StringSerializer.INSTANCE };
@SuppressWarnings("unchecked")
Class<Tuple2<Long, String>> clazz = (Class<Tuple2<Long, String>>) (Class<?>) Tuple2.class;
this.serializer = new TupleSerializer<Tuple2<Long, String>>(clazz, fieldSerializers);
TypeComparator<?>[] comparators = { new LongComparator(true) };
TypeSerializer<?>[] comparatorSerializers = { LongSerializer.INSTANCE };
this.comparator = new TupleComparator<Tuple2<Long, String>>(new int[] {0}, comparators, comparatorSerializers);
this.probeComparator = new LongComparator(true);
this.pairComparator = new TypePairComparator<Long, Tuple2<Long, String>>() {
private long ref;
@Override
public void setReference(Long reference) {
ref = reference;
}
@Override
public boolean equalToReference(Tuple2<Long, String> candidate) {
//noinspection UnnecessaryUnboxing
return candidate.f0.longValue() == ref;
}
@Override
public int compareToReference(Tuple2<Long, String> candidate) {
long x = ref;
long y = candidate.f0;
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
};
}
// ------------------------------------------------------------------------
// tests
// ------------------------------------------------------------------------
@Test
public void testHashTableGrowthWithInsert() {
try {
final int numElements = 1000000;
List<MemorySegment> memory = getMemory(10000, 32 * 1024);
// we create a hash table that thinks the records are super large. that makes it choose initially
// a lot of memory for the partition buffers, and start with a smaller hash table. that way
// we trigger a hash table growth early.
CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
serializer, comparator, memory, 10000);
table.open();
for (long i = 0; i < numElements; i++) {
table.insert(new Tuple2<Long, String>(i, String.valueOf(i)));
}
// make sure that all elements are contained via the entry iterator
{
BitSet bitSet = new BitSet(numElements);
MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator();
Tuple2<Long, String> next;
while ((next = iter.next()) != null) {
assertNotNull(next.f0);
assertNotNull(next.f1);
assertEquals(next.f0.longValue(), Long.parseLong(next.f1));
bitSet.set(next.f0.intValue());
}
assertEquals(numElements, bitSet.cardinality());
}
// make sure all entries are contained via the prober
{
CompactingHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper =
table.getProber(probeComparator, pairComparator);
for (long i = 0; i < numElements; i++) {
assertNotNull(proper.getMatchFor(i));
assertNull(proper.getMatchFor(i + numElements));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* This test validates that records are not lost via "insertOrReplace()" as in bug [FLINK-2361]
*/
@Test
public void testHashTableGrowthWithInsertOrReplace() {
try {
final int numElements = 1000000;
List<MemorySegment> memory = getMemory(10000, 32 * 1024);
// we create a hash table that thinks the records are super large. that makes it choose initially
// a lot of memory for the partition buffers, and start with a smaller hash table. that way
// we trigger a hash table growth early.
CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
serializer, comparator, memory, 10000);
table.open();
for (long i = 0; i < numElements; i++) {
table.insertOrReplaceRecord(new Tuple2<Long, String>(i, String.valueOf(i)));
}
// make sure that all elements are contained via the entry iterator
{
BitSet bitSet = new BitSet(numElements);
MutableObjectIterator<Tuple2<Long, String>> iter = table.getEntryIterator();
Tuple2<Long, String> next;
while ((next = iter.next()) != null) {
assertNotNull(next.f0);
assertNotNull(next.f1);
assertEquals(next.f0.longValue(), Long.parseLong(next.f1));
bitSet.set(next.f0.intValue());
}
assertEquals(numElements, bitSet.cardinality());
}
// make sure all entries are contained via the prober
{
CompactingHashTable<Tuple2<Long, String>>.HashTableProber<Long> proper =
table.getProber(probeComparator, pairComparator);
for (long i = 0; i < numElements; i++) {
assertNotNull(proper.getMatchFor(i));
assertNull(proper.getMatchFor(i + numElements));
}
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* This test validates that new inserts (rather than updates) in "insertOrReplace()" properly
* react to out of memory conditions.
*/
@Test
public void testInsertsWithInsertOrReplace() {
try {
final int numElements = 1000;
final String longString = getLongString(10000);
List<MemorySegment> memory = getMemory(1000, 32 * 1024);
// we create a hash table that thinks the records are super large. that makes it choose initially
// a lot of memory for the partition buffers, and start with a smaller hash table. that way
// we trigger a hash table growth early.
CompactingHashTable<Tuple2<Long, String>> table = new CompactingHashTable<Tuple2<Long, String>>(
serializer, comparator, memory, 100);
table.open();
// first, we insert some elements
for (long i = 0; i < numElements; i++) {
table.insertOrReplaceRecord(new Tuple2<Long, String>(i, longString));
}
// now, we replace the same elements, causing fragmentation
for (long i = 0; i < numElements; i++) {
table.insertOrReplaceRecord(new Tuple2<Long, String>(i, longString));
}
// now we insert an additional set of elements. without compaction during this insertion,
// the memory will run out
for (long i = 0; i < numElements; i++) {
table.insertOrReplaceRecord(new Tuple2<Long, String>(i + numElements, longString));
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
for (int i = 0; i < numSegments; i++) {
list.add(new MemorySegment(new byte[segmentSize]));
}
return list;
}
private static String getLongString(int length) {
StringBuilder bld = new StringBuilder(length);
for (int i = 0; i < length; i++) {
bld.append('a');
}
return bld.toString();
}
}
......@@ -27,10 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.hash.MutableHashTable;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
......@@ -38,6 +34,7 @@ import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;
import static org.junit.Assert.*;
......@@ -72,8 +69,8 @@ public class HashTablePerformanceComparison {
MutableObjectIterator<IntPair> updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
long start = 0L;
long end = 0L;
long start;
long end;
long first = System.currentTimeMillis();
......@@ -105,7 +102,7 @@ public class HashTablePerformanceComparison {
start = System.currentTimeMillis();
while(updater.next(target) != null) {
target.setValue(target.getValue()*-1);
table.insertOrReplaceRecord(target, temp);
table.insertOrReplaceRecord(target);
}
end = System.currentTimeMillis();
System.out.println("Update done. Time: " + (end-start) + " ms");
......@@ -147,8 +144,8 @@ public class HashTablePerformanceComparison {
MutableObjectIterator<IntPair> updateTester = new UniformIntPairGenerator(NUM_PAIRS, 1, false);
long start = 0L;
long end = 0L;
long start;
long end;
long first = System.currentTimeMillis();
......
......@@ -27,9 +27,6 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.operators.hash.AbstractHashTableProber;
import org.apache.flink.runtime.operators.hash.AbstractMutableHashTable;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.testutils.UniformStringPairGenerator;
import org.apache.flink.runtime.operators.testutils.types.IntList;
import org.apache.flink.runtime.operators.testutils.types.IntListComparator;
......@@ -45,7 +42,9 @@ import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.*;
......@@ -235,9 +234,8 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
table.insertOrReplaceRecord(overwriteLists[i]);
}
for (int i = 0; i < NUM_LISTS; i++) {
......@@ -278,10 +276,9 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
if( i % 100 != 0) {
table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
table.insertOrReplaceRecord(overwriteLists[i]);
lists[i] = overwriteLists[i];
}
}
......@@ -327,10 +324,9 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS/STEP_SIZE, rnd);
// test replacing
IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i += STEP_SIZE) {
overwriteLists[i/STEP_SIZE].setKey(overwriteLists[i/STEP_SIZE].getKey()*STEP_SIZE);
table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE], tempHolder);
table.insertOrReplaceRecord(overwriteLists[i/STEP_SIZE]);
lists[i] = overwriteLists[i/STEP_SIZE];
}
......@@ -379,9 +375,8 @@ public class MemoryHashTableTest {
for(int k = 0; k < NUM_REWRITES; k++) {
overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
table.insertOrReplaceRecord(overwriteLists[i]);
}
for (int i = 0; i < NUM_LISTS; i++) {
......@@ -409,11 +404,7 @@ public class MemoryHashTableTest {
table.open();
for (int i = 0; i < NUM_LISTS; i++) {
try {
table.insert(lists[i]);
} catch (Exception e) {
throw e;
}
}
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
......@@ -630,9 +621,8 @@ public class MemoryHashTableTest {
final IntList[] overwriteLists = getRandomizedIntLists(NUM_LISTS, rnd);
// test replacing
IntList tempHolder = new IntList();
for (int i = 0; i < NUM_LISTS; i++) {
table.insertOrReplaceRecord(overwriteLists[i], tempHolder);
table.insertOrReplaceRecord(overwriteLists[i]);
}
Field list = Whitebox.getField(CompactingHashTable.class, "partitions");
......@@ -691,7 +681,7 @@ public class MemoryHashTableTest {
while(updater.next(target) != null) {
target.setValue(target.getValue());
table.insertOrReplaceRecord(target, temp);
table.insertOrReplaceRecord(target);
}
while (updateTester.next(target) != null) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册