提交 233c0147 编写于 作者: C chengxiang li

[FLINK-3281] IndexOutOfBoundsException when range partition on empty DataSet

上级 6c0a83e4
......@@ -60,13 +60,15 @@ public class RangeBoundaryBuilder<T> extends RichMapPartitionFunction<T, Object[
int boundarySize = parallelism - 1;
Object[][] boundaries = new Object[boundarySize][];
double avgRange = sampledData.size() / (double) parallelism;
int numKey = comparator.getFlatComparators().length;
for (int i = 1; i < parallelism; i++) {
T record = sampledData.get((int) (i * avgRange));
Object[] keys = new Object[numKey];
comparator.extractKeys(record, keys, 0);
boundaries[i-1] = keys;
if (sampledData.size() > 0) {
double avgRange = sampledData.size() / (double) parallelism;
int numKey = comparator.getFlatComparators().length;
for (int i = 1; i < parallelism; i++) {
T record = sampledData.get((int) (i * avgRange));
Object[] keys = new Object[numKey];
comparator.extractKeys(record, keys, 0);
boundaries[i-1] = keys;
}
}
out.collect(boundaries);
......
......@@ -49,6 +49,38 @@ class PartitionITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(
TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath)
}
@Test
def testEmptyHashPartition(): Unit = {
/*
* Test hash partition by tuple field
*/
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromCollection(Seq[Tuple1[String]]())
val unique = ds.partitionByHash(0)
unique.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
expected = ""
}
@Test
def testEmptyRangePartition(): Unit = {
/*
* Test hash partition by tuple field
*/
val env = ExecutionEnvironment.getExecutionEnvironment
val ds = env.fromCollection(Seq[Tuple1[String]]())
val unique = ds.partitionByRange(0)
unique.writeAsText(resultPath, WriteMode.OVERWRITE)
env.execute()
expected = ""
}
@Test
def testHashPartitionByTupleField(): Unit = {
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册