提交 0d3ff88b 编写于 作者: A Aleksandr Chermenin 提交者: Maximilian Michels

[FLINK-2608] Updated Twitter Chill version.

[FLINK-2608] Updated test with Java collections.

[FLINK-2608] Updated Chill and Kryo dependencies.

[FLINK-2608] Added collections serialization test.

This closes #2623.
上级 4653ad38
......@@ -149,6 +149,19 @@ under the License.
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>${chill.version}</version>
<exclusions>
<!-- Exclude Kryo dependency from Chill -->
<exclusion>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Include our own version of Kryo -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</dependency>
<dependency>
......
......@@ -998,6 +998,44 @@ public class GroupReduceITCase extends MultipleProgramsTestBase {
compareResultAsText(result, expected);
}
@Test
public void testJavaArraysAsListCollectionsWithinPojos() throws Exception {
/*
* Test Java collections created via Arrays.asList() method within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithArraysAsListCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key")
.reduceGroup(new GroupReducer7());
List<String> result = reduceDs.collect();
String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
compareResultAsText(result, expected);
}
@Test
public void testJavaUnmodifiableCollectionsWithinPojos() throws Exception {
/*
* Test Java collections created via Collections.unmodifiable...() methods within pojos ( == test kryo)
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet<CollectionDataSets.PojoWithCollection> ds = CollectionDataSets.getPojoWithUnmodifiableCollection(env);
// f0.f0 is first integer
DataSet<String> reduceDs = ds.groupBy("key")
.reduceGroup(new GroupReducer7());
List<String> result = reduceDs.collect();
String expected = "callFor key 0 we got: pojo.a=apojo.a=bFor key 0 we got: pojo.a=a2pojo.a=b2\n";
compareResultAsText(result, expected);
}
public static class GroupReducer7 implements GroupReduceFunction<CollectionDataSets.PojoWithCollection, String> {
@Override
......
......@@ -23,6 +23,8 @@ import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
......@@ -46,10 +48,10 @@ import scala.math.BigInt;
/**
* #######################################################################################################
*
* BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
*
* BE AWARE THAT OTHER TESTS DEPEND ON THIS TEST DATA.
* IF YOU MODIFY THE DATA MAKE SURE YOU CHECK THAT ALL TESTS ARE STILL WORKING!
*
*
* #######################################################################################################
*/
public class CollectionDataSets {
......@@ -201,7 +203,7 @@ public class CollectionDataSets {
return env.fromCollection(data, type);
}
public static DataSet<Tuple2<byte[], Integer>> getTuple2WithByteArrayDataSet(ExecutionEnvironment env) {
List<Tuple2<byte[], Integer>> data = new ArrayList<>();
data.add(new Tuple2<>(new byte[]{0, 4}, 1));
......@@ -210,12 +212,12 @@ public class CollectionDataSets {
data.add(new Tuple2<>(new byte[]{2, 1}, 3));
data.add(new Tuple2<>(new byte[]{0}, 0));
data.add(new Tuple2<>(new byte[]{2, 0}, 1));
TupleTypeInfo<Tuple2<byte[], Integer>> type = new TupleTypeInfo<>(
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);
return env.fromCollection(data, type);
}
......@@ -347,13 +349,13 @@ public class CollectionDataSets {
data.add(new Tuple7<>(3, "Third", 30, 300, 3000L, "Three", 30000L));
return env.fromCollection(data);
}
public static DataSet<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> getSmallTuplebasedDataSetMatchingPojo(ExecutionEnvironment env) {
List<Tuple7<Long, Integer, Integer, Long, String, Integer, String>> data = new ArrayList<>();
data.add(new Tuple7<>(10000L, 10, 100, 1000L, "One", 1, "First"));
data.add(new Tuple7<>(20000L, 20, 200, 2000L, "Two", 2, "Second"));
data.add(new Tuple7<>(30000L, 30, 300, 3000L, "Three", 3, "Third"));
return env.fromCollection(data);
}
......@@ -610,22 +612,22 @@ public class CollectionDataSets {
public Date date;
public Category cat;
}
public static DataSet<PojoWithDateAndEnum> getPojoWithDateAndEnum(ExecutionEnvironment env) {
List<PojoWithDateAndEnum> data = new ArrayList<>();
PojoWithDateAndEnum one = new PojoWithDateAndEnum();
one.group = "a"; one.date = new Date(666); one.cat = Category.CAT_A;
data.add(one);
PojoWithDateAndEnum two = new PojoWithDateAndEnum();
two.group = "a"; two.date = new Date(666); two.cat = Category.CAT_A;
data.add(two);
PojoWithDateAndEnum three = new PojoWithDateAndEnum();
three.group = "b"; three.date = new Date(666); three.cat = Category.CAT_B;
data.add(three);
return env.fromCollection(data);
}
......@@ -693,7 +695,7 @@ public class CollectionDataSets {
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;
// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
......@@ -710,7 +712,103 @@ public class CollectionDataSets {
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
data.add(pwc1);
data.add(pwc2);
return env.fromCollection(data);
}
public static DataSet<PojoWithCollection> getPojoWithArraysAsListCollection(ExecutionEnvironment env) {
List<PojoWithCollection> data = new ArrayList<>();
List<Pojo1> pojosList1 = Arrays.asList(
new Pojo1("a", "aa"),
new Pojo1("b", "bb")
);
List<Pojo1> pojosList2 = Arrays.asList(
new Pojo1("a2", "aa2"),
new Pojo1("b2", "bb2")
);
PojoWithCollection pwc1 = new PojoWithCollection();
pwc1.pojos = pojosList1;
pwc1.key = 0;
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;
// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
Map<String, Integer> map = new HashMap<>();
map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
pwc1.mixed = Arrays.asList(
map,
new File(""),
"uhlala",
Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4) // to test Arrays.asList() with primitives
);
PojoWithCollection pwc2 = new PojoWithCollection();
pwc2.pojos = pojosList2;
pwc2.key = 0;
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
data.add(pwc1);
data.add(pwc2);
return env.fromCollection(data);
}
public static DataSet<PojoWithCollection> getPojoWithUnmodifiableCollection(ExecutionEnvironment env) {
List<PojoWithCollection> data = new ArrayList<>();
List<Pojo1> pojosList1 = new ArrayList<>();
pojosList1.add(new Pojo1("a", "aa"));
pojosList1.add(new Pojo1("b", "bb"));
List<Pojo1> pojosList2 = new ArrayList<>();
pojosList2.add(new Pojo1("a2", "aa2"));
pojosList2.add(new Pojo1("b2", "bb2"));
PojoWithCollection pwc1 = new PojoWithCollection();
pwc1.pojos = Collections.unmodifiableList(pojosList1);
pwc1.key = 0;
pwc1.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc1.scalaBigInt = BigInt.int2bigInt(10);
pwc1.bigDecimalKeepItNull = null;
// use calendar to make it stable across time zones
GregorianCalendar gcl1 = new GregorianCalendar(2033, 4, 18);
pwc1.sqlDate = new java.sql.Date(gcl1.getTimeInMillis());
ArrayList<Object> mixedList = new ArrayList<>();
Map<String, Integer> map = new HashMap<>();
map.put("someKey", 1); // map.put("anotherKey", 2); map.put("third", 3);
mixedList.add(map);
mixedList.add(new File("/this/is/wrong"));
mixedList.add("uhlala");
pwc1.mixed = Collections.unmodifiableList(mixedList);
PojoWithCollection pwc2 = new PojoWithCollection();
pwc2.pojos = Collections.unmodifiableList(pojosList2);
pwc2.key = 0;
pwc2.bigInt = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.TEN);
pwc2.scalaBigInt = BigInt.int2bigInt(31104000);
pwc2.bigDecimalKeepItNull = null;
GregorianCalendar gcl2 = new GregorianCalendar(1976, 4, 3);
pwc2.sqlDate = new java.sql.Date(gcl2.getTimeInMillis()); // 1976
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.test.runtime.kryo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.esotericsoftware.kryo.Kryo;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.AbstractGenericTypeSerializerTest;
import org.apache.flink.api.java.typeutils.runtime.TestDataOutputSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Random;
import java.util.Set;
@SuppressWarnings("unchecked")
public class KryoCollectionsSerializerTest extends AbstractGenericTypeSerializerTest {
private ExecutionConfig ec = new ExecutionConfig();
@Test
public void testJavaList(){
Collection<Integer> a = new ArrayList<>();
fillCollection(a);
runTests(a);
}
@Test
public void testJavaSet(){
Collection<Integer> b = new HashSet<>();
fillCollection(b);
runTests(b);
}
@Test
public void testJavaDequeue(){
Collection<Integer> c = new LinkedList<>();
fillCollection(c);
runTests(c);
}
@Test
public void testJavaArraysAsList(){
Collection<Integer> a = Arrays.asList(42, 1337, 49, 1);
runTests(a);
}
@Test
public void testJavaUnmodifiableSet(){
Set<Integer> b = new HashSet<>();
fillCollection(b);
runTests(Collections.unmodifiableSet(b));
}
@Test
public void testJavaSingletonList(){
Collection<Integer> c = Collections.singletonList(42);
runTests(c);
}
private void fillCollection(Collection<Integer> coll) {
coll.add(42);
coll.add(1337);
coll.add(49);
coll.add(1);
}
@Override
protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
return new KryoSerializer<T>(type, ec);
}
/**
* Make sure that the kryo serializer forwards EOF exceptions properly when serializing
*/
@Test
public void testForwardEOFExceptionWhileSerializing() {
try {
// construct a long string
String str;
{
char[] charData = new char[40000];
Random rnd = new Random();
for (int i = 0; i < charData.length; i++) {
charData[i] = (char) rnd.nextInt(10000);
}
str = new String(charData);
}
// construct a memory target that is too small for the string
TestDataOutputSerializer target = new TestDataOutputSerializer(10000, 30000);
KryoSerializer<String> serializer = new KryoSerializer<String>(String.class, new ExecutionConfig());
try {
serializer.serialize(str, target);
fail("should throw a java.io.EOFException");
}
catch (java.io.EOFException e) {
// that is how we like it
}
catch (Exception e) {
fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
/**
* Make sure that the kryo serializer forwards EOF exceptions properly when serializing
*/
@Test
public void testForwardEOFExceptionWhileDeserializing() {
try {
int numElements = 100;
// construct a memory target that is too small for the string
TestDataOutputSerializer target = new TestDataOutputSerializer(5*numElements, 5*numElements);
KryoSerializer<Integer> serializer = new KryoSerializer<>(Integer.class, new ExecutionConfig());
for(int i = 0; i < numElements; i++){
serializer.serialize(i, target);
}
ComparatorTestBase.TestInputView source = new ComparatorTestBase.TestInputView(target.copyByteBuffer());
for(int i = 0; i < numElements; i++){
int value = serializer.deserialize(source);
assertEquals(i, value);
}
try {
serializer.deserialize(source);
fail("should throw a java.io.EOFException");
}
catch (java.io.EOFException e) {
// that is how we like it :-)
}
catch (Exception e) {
fail("throws wrong exception: should throw a java.io.EOFException, has thrown a " + e.getClass().getName());
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void validateReferenceMappingEnabled() {
KryoSerializer<String> serializer = new KryoSerializer<>(String.class, new ExecutionConfig());
Kryo kryo = serializer.getKryo();
assertTrue(kryo.getReferences());
}
}
......@@ -101,7 +101,7 @@ under the License.
<!-- Default scala versions, may be overwritten by build profiles -->
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
<chill.version>0.7.4</chill.version>
<chill.version>0.8.1</chill.version>
<asm.version>5.0.4</asm.version>
<zookeeper.version>3.4.6</zookeeper.version>
<curator.version>2.8.0</curator.version>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册