提交 98f3046f 编写于 作者: T Timo Walther

[FLINK-12254][table] Update TableSchema to new type system

上级 efec5321
......@@ -145,7 +145,7 @@ public abstract class CliResultView<O extends Enum<O>> extends CliView<O, Void>
final CliRowView view = new CliRowView(
client,
resultDescriptor.getResultSchema().getFieldNames(),
CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldTypes()),
CliUtils.typesToString(resultDescriptor.getResultSchema().getFieldDataTypes()),
getRow(results.get(selectedRow)));
view.open(); // enter view
}
......
......@@ -18,8 +18,8 @@
package org.apache.flink.table.client.cli;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.jline.utils.AttributedString;
......@@ -103,7 +103,7 @@ public final class CliUtils {
return fields;
}
public static String[] typesToString(TypeInformation<?>[] types) {
public static String[] typesToString(DataType[] types) {
final String[] typesAsString = new String[types.length];
for (int i = 0; i < types.length; i++) {
typesAsString[i] = types[i].toString();
......
......@@ -20,8 +20,9 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
......@@ -33,10 +34,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.IntStream;
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.api.DataTypes.Field;
import static org.apache.flink.table.api.DataTypes.ROW;
import static org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo;
import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
/**
* A table schema that represents a table's structure with field names and types.
*/
* A table schema that represents a table's structure with field names and data types.
*/
@PublicEvolving
public class TableSchema {
......@@ -44,20 +52,20 @@ public class TableSchema {
private final String[] fieldNames;
private final TypeInformation<?>[] fieldTypes;
private final DataType[] fieldDataTypes;
private final Map<String, Integer> fieldNameToIndex;
public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
private TableSchema(String[] fieldNames, DataType[] fieldDataTypes) {
this.fieldNames = Preconditions.checkNotNull(fieldNames);
this.fieldTypes = Preconditions.checkNotNull(fieldTypes);
this.fieldDataTypes = Preconditions.checkNotNull(fieldDataTypes);
if (fieldNames.length != fieldTypes.length) {
if (fieldNames.length != fieldDataTypes.length) {
throw new TableException(
"Number of field names and field types must be equal.\n" +
"Number of names is " + fieldNames.length + ", number of types is " + fieldTypes.length + ".\n" +
"List of field names: " + Arrays.toString(fieldNames) + "\n" +
"List of field types: " + Arrays.toString(fieldTypes));
"Number of field names and field data types must be equal.\n" +
"Number of names is " + fieldNames.length + ", number of data types is " + fieldDataTypes.length + ".\n" +
"List of field names: " + Arrays.toString(fieldNames) + "\n" +
"List of field data types: " + Arrays.toString(fieldDataTypes));
}
// validate and create name to index mapping
......@@ -66,7 +74,7 @@ public class TableSchema {
final Set<String> uniqueNames = new HashSet<>();
for (int i = 0; i < fieldNames.length; i++) {
// check for null
Preconditions.checkNotNull(fieldTypes[i]);
Preconditions.checkNotNull(fieldDataTypes[i]);
final String fieldName = Preconditions.checkNotNull(fieldNames[i]);
// collect indices
......@@ -82,49 +90,83 @@ public class TableSchema {
if (!duplicateNames.isEmpty()) {
throw new TableException(
"Field names must be unique.\n" +
"List of duplicate fields: " + duplicateNames.toString() + "\n" +
"List of all fields: " + Arrays.toString(fieldNames));
"List of duplicate fields: " + duplicateNames.toString() + "\n" +
"List of all fields: " + Arrays.toString(fieldNames));
}
}
/**
* @deprecated Use the {@link Builder} instead.
*/
@Deprecated
public TableSchema(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
this(fieldNames, fromLegacyInfoToDataType(fieldTypes));
}
/**
* Returns a deep copy of the table schema.
*/
public TableSchema copy() {
return new TableSchema(fieldNames.clone(), fieldTypes.clone());
return new TableSchema(fieldNames.clone(), fieldDataTypes.clone());
}
/**
* Returns all field type information as an array.
* Returns all field data types as an array.
*/
public DataType[] getFieldDataTypes() {
return fieldDataTypes;
}
/**
* @deprecated Use {@link #getFieldDataTypes()} instead.
*/
@Deprecated
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
return fromDataTypeToLegacyInfo(fieldDataTypes);
}
/**
* Returns the specified type information for the given field index.
* Returns the specified data type for the given field index.
*
* @param fieldIndex the index of the field
*/
public Optional<TypeInformation<?>> getFieldType(int fieldIndex) {
if (fieldIndex < 0 || fieldIndex >= fieldTypes.length) {
public Optional<DataType> getFieldDataType(int fieldIndex) {
if (fieldIndex < 0 || fieldIndex >= fieldDataTypes.length) {
return Optional.empty();
}
return Optional.of(fieldTypes[fieldIndex]);
return Optional.of(fieldDataTypes[fieldIndex]);
}
/**
* Returns the specified type information for the given field name.
* @deprecated Use {@link #getFieldDataType(int)}} instead.
*/
@Deprecated
public Optional<TypeInformation<?>> getFieldType(int fieldIndex) {
return getFieldDataType(fieldIndex)
.map(TypeConversions::fromDataTypeToLegacyInfo);
}
/**
* Returns the specified data type for the given field name.
*
* @param fieldName the name of the field
*/
public Optional<TypeInformation<?>> getFieldType(String fieldName) {
public Optional<DataType> getFieldDataType(String fieldName) {
if (fieldNameToIndex.containsKey(fieldName)) {
return Optional.of(fieldTypes[fieldNameToIndex.get(fieldName)]);
return Optional.of(fieldDataTypes[fieldNameToIndex.get(fieldName)]);
}
return Optional.empty();
}
/**
* @deprecated Use {@link #getFieldDataType(String)} instead.
*/
@Deprecated
public Optional<TypeInformation<?>> getFieldType(String fieldName) {
return getFieldDataType(fieldName)
.map(TypeConversions::fromDataTypeToLegacyInfo);
}
/**
* Returns the number of fields.
*/
......@@ -152,10 +194,22 @@ public class TableSchema {
}
/**
* Converts a table schema into a (nested) type information describing a {@link Row}.
* Converts a table schema into a (nested) data type describing a {@link DataTypes#ROW(Field...)}.
*/
public DataType toRowDataType() {
final Field[] fields = IntStream.range(0, fieldDataTypes.length)
.mapToObj(i -> FIELD(fieldNames[i], fieldDataTypes[i]))
.toArray(Field[]::new);
return ROW(fields);
}
/**
* @deprecated Use {@link #toRowDataType()} instead.
*/
@Deprecated
@SuppressWarnings("unchecked")
public TypeInformation<Row> toRowType() {
return Types.ROW_NAMED(fieldNames, fieldTypes);
return (TypeInformation<Row>) fromDataTypeToLegacyInfo(toRowDataType());
}
@Override
......@@ -163,7 +217,7 @@ public class TableSchema {
final StringBuilder sb = new StringBuilder();
sb.append("root\n");
for (int i = 0; i < fieldNames.length; i++) {
sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldTypes[i]).append('\n');
sb.append(" |-- ").append(fieldNames[i]).append(": ").append(fieldDataTypes[i]).append('\n');
}
return sb.toString();
}
......@@ -178,13 +232,13 @@ public class TableSchema {
}
TableSchema schema = (TableSchema) o;
return Arrays.equals(fieldNames, schema.fieldNames) &&
Arrays.equals(fieldTypes, schema.fieldTypes);
Arrays.equals(fieldDataTypes, schema.fieldDataTypes);
}
@Override
public int hashCode() {
int result = Arrays.hashCode(fieldNames);
result = 31 * result + Arrays.hashCode(fieldTypes);
result = 31 * result + Arrays.hashCode(fieldDataTypes);
return result;
}
......@@ -196,7 +250,10 @@ public class TableSchema {
*
* @param typeInfo The {@link TypeInformation} from which the table schema is generated.
* @return The table schema that was generated from the given {@link TypeInformation}.
*
* @deprecated This method will be removed soon. Use {@link DataTypes} to declare types.
*/
@Deprecated
public static TableSchema fromTypeInfo(TypeInformation<?> typeInfo) {
if (typeInfo instanceof CompositeType<?>) {
final CompositeType<?> compositeType = (CompositeType<?>) typeInfo;
......@@ -228,32 +285,41 @@ public class TableSchema {
private List<String> fieldNames;
private List<TypeInformation<?>> fieldTypes;
private List<DataType> fieldDataTypes;
public Builder() {
fieldNames = new ArrayList<>();
fieldTypes = new ArrayList<>();
fieldDataTypes = new ArrayList<>();
}
/**
* Add a field with name and type. The call order of this method determines the order
* of fields in the schema.
* Add a field with name and data type.
*
* <p>The call order of this method determines the order of fields in the schema.
*/
public Builder field(String name, TypeInformation<?> type) {
public Builder field(String name, DataType dataType) {
Preconditions.checkNotNull(name);
Preconditions.checkNotNull(type);
Preconditions.checkNotNull(dataType);
fieldNames.add(name);
fieldTypes.add(type);
fieldDataTypes.add(dataType);
return this;
}
/**
* @deprecated Use {@link #field(String, DataType)} instead.
*/
@Deprecated
public Builder field(String name, TypeInformation<?> typeInfo) {
return field(name, fromLegacyInfoToDataType(typeInfo));
}
/**
* Returns a {@link TableSchema} instance.
*/
public TableSchema build() {
return new TableSchema(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new TypeInformation<?>[0]));
fieldDataTypes.toArray(new DataType[0]));
}
}
}
......@@ -39,8 +39,8 @@ class TableSchemaTest extends TableTestBase {
assertEquals(Types.STRING, schema.getFieldTypes.apply(1))
val expectedString = "root\n" +
" |-- a: Integer\n" +
" |-- b: String\n"
" |-- a: INT\n" +
" |-- b: STRING\n"
assertEquals(expectedString, schema.toString)
assertTrue(!schema.getFieldName(3).isPresent)
......@@ -61,8 +61,8 @@ class TableSchemaTest extends TableTestBase {
assertEquals(Types.STRING, schema.getFieldTypes.apply(1))
val expectedString = "root\n" +
" |-- a: Integer\n" +
" |-- b: String\n"
" |-- a: INT\n" +
" |-- b: STRING\n"
assertEquals(expectedString, schema.toString)
assertTrue(!schema.getFieldName(3).isPresent)
......
......@@ -28,10 +28,10 @@ class TableSchemaValidationTest extends TableTestBase {
def testColumnNameAndColumnTypeNotEqual() {
thrown.expect(classOf[TableException])
thrown.expectMessage(
"Number of field names and field types must be equal.\n" +
"Number of names is 3, number of types is 2.\n" +
"Number of field names and field data types must be equal.\n" +
"Number of names is 3, number of data types is 2.\n" +
"List of field names: [a, b, c]\n" +
"List of field types: [Integer, String]")
"List of field data types: [INT, STRING]")
val fieldNames = Array("a", "b", "c")
val typeInfos: Array[TypeInformation[_]] = Array(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册