提交 d554faa3 编写于 作者: S Stephan Ewen

[FLINK-1273] [runtime] Add Void type to basic types

Add optional test for external sorting of case classes.
Fix various warnings.
上级 17bc479c
......@@ -23,6 +23,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanComparator;
......@@ -45,10 +46,11 @@ import org.apache.flink.api.common.typeutils.base.ShortComparator;
import org.apache.flink.api.common.typeutils.base.ShortSerializer;
import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
/**
*
* Type information for primitive types (int, long, double, byte, ...), String, Date, and Void.
*/
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
......@@ -62,6 +64,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
public static final BasicTypeInfo<Double> DOUBLE_TYPE_INFO = new BasicTypeInfo<Double>(Double.class, DoubleSerializer.INSTANCE, DoubleComparator.class);
public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, CharSerializer.INSTANCE, CharComparator.class);
public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, DateSerializer.INSTANCE, DateComparator.class);
public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, VoidSerializer.INSTANCE, null);
// --------------------------------------------------------------------------------------------
......@@ -117,7 +120,11 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
@Override
public TypeComparator<T> createComparator(boolean sortOrderAscending) {
return instantiateComparator(comparatorClass, sortOrderAscending);
if (comparatorClass != null) {
return instantiateComparator(comparatorClass, sortOrderAscending);
} else {
throw new InvalidTypesException("The type " + clazz.getSimpleName() + " cannot be used as a key.");
}
}
// --------------------------------------------------------------------------------------------
......@@ -150,6 +157,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
throw new NullPointerException();
}
@SuppressWarnings("unchecked")
BasicTypeInfo<X> info = (BasicTypeInfo<X>) TYPES.get(type);
return info;
}
......@@ -185,5 +193,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
TYPES.put(Character.class, CHAR_TYPE_INFO);
TYPES.put(char.class, CHAR_TYPE_INFO);
TYPES.put(Date.class, DATE_TYPE_INFO);
TYPES.put(Void.class, VOID_TYPE_INFO);
TYPES.put(void.class, VOID_TYPE_INFO);
}
}
......@@ -79,7 +79,7 @@ public final class EnumComparator<T extends Enum<T>> extends BasicTypeComparator
}
@Override
public EnumComparator duplicate() {
return new EnumComparator(ascendingComparison);
public EnumComparator<T> duplicate() {
return new EnumComparator<T>(ascendingComparison);
}
}
......@@ -93,7 +93,7 @@ public final class EnumSerializer<T extends Enum<T>> extends TypeSerializer<T> {
@Override
public boolean equals(Object obj) {
if(obj instanceof EnumSerializer) {
EnumSerializer other = (EnumSerializer) obj;
EnumSerializer<?> other = (EnumSerializer<?>) obj;
return other.enumClass == this.enumClass;
} else {
return false;
......
......@@ -25,7 +25,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for arrays of objects.
*
......@@ -163,4 +162,9 @@ public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
return false;
}
}
@Override
public String toString() {
return "Serializer " + componentClass.getName() + "[]";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public final class VoidSerializer extends TypeSerializerSingleton<Void> {
private static final long serialVersionUID = 1L;
public static final VoidSerializer INSTANCE = new VoidSerializer();
@Override
public boolean isImmutableType() {
return true;
}
@Override
public boolean isStateful() {
return false;
}
@Override
public Void createInstance() {
return null;
}
@Override
public Void copy(Void from) {
return null;
}
@Override
public Void copy(Void from, Void reuse) {
return null;
}
@Override
public int getLength() {
return 1;
}
@Override
public void serialize(Void record, DataOutputView target) throws IOException {
// make progress in the stream, write one byte
target.write(0);
}
@Override
public Void deserialize(DataInputView source) throws IOException {
source.readByte();
return null;
}
@Override
public Void deserialize(Void reuse, DataInputView source) throws IOException {
source.readByte();
return null;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source.readByte());
}
}
......@@ -179,7 +179,7 @@ public abstract class SequentialFormatTestBase<T> {
Configuration configuration = new Configuration();
configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize);
if (this.degreeOfParallelism == 1) {
BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI().toString(),
BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI().toString(),
configuration);
for (int index = 0; index < this.numberOfTuples; index++) {
output.writeRecord(this.getRecord(index));
......@@ -190,7 +190,7 @@ public abstract class SequentialFormatTestBase<T> {
this.tempFile.mkdir();
int recordIndex = 0;
for (int fileIndex = 0; fileIndex < this.degreeOfParallelism; fileIndex++) {
BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI() + "/" +
BinaryOutputFormat<T> output = createOutputFormat(this.tempFile.toURI() + "/" +
(fileIndex+1), configuration);
for (int fileCount = 0; fileCount < this.getNumberOfTuplesPerFile(fileIndex); fileCount++, recordIndex++) {
output.writeRecord(this.getRecord(recordIndex));
......
......@@ -57,7 +57,7 @@ public class SerializedFormatTest extends SequentialFormatTestBase<Record> {
}
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
protected BinaryOutputFormat<Record> createOutputFormat(String path, Configuration
configuration) throws IOException {
......
......@@ -163,7 +163,7 @@ under the License.
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.4</version>
<version>2.4</version><!--$NO-MVN-MAN-VER$-->
<executions>
<!-- Uber-jar -->
<execution>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.misc
import java.io.File
import java.util.Random
import java.io.BufferedWriter
import java.io.FileWriter
import org.apache.flink.api.scala._
import java.io.BufferedReader
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync
import java.io.FileReader
import org.apache.flink.util.MutableObjectIterator
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory
import org.junit.Assert._;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
class MassiveCaseClassSortingITCase {
val SEED : Long = 347569784659278346L
def testStringTuplesSorting() {
val NUM_STRINGS = 3000000
var input: File = null
var sorted: File = null
try {
input = generateFileWithStringTuples(NUM_STRINGS,
"http://some-uri.com/that/is/a/common/prefix/to/all")
sorted = File.createTempFile("sorted_strings", "txt")
val command = Array("/bin/bash", "-c", "export LC_ALL=\"C\" && cat \""
+ input.getAbsolutePath + "\" | sort > \"" + sorted.getAbsolutePath + "\"")
var p: Process = null
try {
p = Runtime.getRuntime.exec(command)
val retCode = p.waitFor()
if (retCode != 0) {
throw new Exception("Command failed with return code " + retCode)
}
p = null
}
finally {
if (p != null) {
p.destroy()
}
}
var sorter: UnilateralSortMerger[StringTuple] = null
var reader: BufferedReader = null
var verifyReader: BufferedReader = null
try {
reader = new BufferedReader(new FileReader(input))
val inputIterator = new StringTupleReader(reader)
val typeInfo = implicitly[TypeInformation[StringTuple]]
.asInstanceOf[CompositeType[StringTuple]];
val serializer = typeInfo.createSerializer()
val comparator = typeInfo.createComparator(Array(0, 1), Array(true, true), 0)
val mm = new DefaultMemoryManager(1024 * 1024, 1)
val ioMan = new IOManagerAsync()
sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator,
new DummyInvokable(),
new RuntimeStatelessSerializerFactory[StringTuple](serializer, classOf[StringTuple]),
comparator, 1.0, 4, 0.8f)
val sortedData = sorter.getIterator
reader.close()
verifyReader = new BufferedReader(new FileReader(sorted))
val verifyIterator = new StringTupleReader(verifyReader)
var num = 0
var hasMore = true;
while (hasMore) {
val next = verifyIterator.next(null)
if (next != null ) {
num += 1
val nextFromFlinkSort = sortedData.next(null)
assertNotNull(nextFromFlinkSort)
assertEquals(next.key1, nextFromFlinkSort.key1)
assertEquals(next.key2, nextFromFlinkSort.key2)
// assert array equals does not work here
assertEquals(next.value.length, nextFromFlinkSort.value.length)
for (i <- 0 until next.value.length) {
assertEquals(next.value(i), nextFromFlinkSort.value(i))
}
}
else {
hasMore = false
}
}
assertNull(sortedData.next(null))
assertEquals(NUM_STRINGS, num);
}
finally {
if (reader != null) {
reader.close()
}
if (verifyReader != null) {
verifyReader.close()
}
if (sorter != null) {
sorter.close()
}
}
}
catch {
case e: Exception => {
System.err.println(e.getMessage)
e.printStackTrace()
e.getMessage
}
}
finally {
if (input != null) {
input.delete()
}
if (sorted != null) {
sorted.delete()
}
}
}
private def generateFileWithStringTuples(numStrings: Int, prefix: String): File = {
val rnd = new Random(SEED)
val bld = new StringBuilder()
val f = File.createTempFile("strings", "txt")
var wrt: BufferedWriter = null
try {
wrt = new BufferedWriter(new FileWriter(f))
for (i <- 0 until numStrings) {
bld.setLength(0)
val numComps = rnd.nextInt(5) + 2
for (z <- 0 until numComps) {
if (z > 0) {
bld.append(' ')
}
bld.append(prefix)
val len = rnd.nextInt(20) + 10
for (k <- 0 until len) {
val c = (rnd.nextInt(80) + 40).toChar
bld.append(c)
}
}
val str = bld.toString
wrt.write(str)
wrt.newLine()
}
}
finally {
wrt.close()
}
f
}
}
object MassiveCaseClassSortingITCase {
def main(args: Array[String]) {
new MassiveCaseClassSortingITCase().testStringTuplesSorting;
}
}
case class StringTuple(key1: String, key2: String, value: Array[String])
class StringTupleReader(val reader: BufferedReader) extends MutableObjectIterator[StringTuple] {
override def next(reuse: StringTuple): StringTuple = {
val line = reader.readLine()
if (line == null) {
return null
}
val parts = line.split(" ")
StringTuple(parts(0), parts(1), parts)
}
}
class DummyInvokable extends AbstractInvokable {
override def registerInputOutput() = {}
override def invoke() = {}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册