提交 edbf8c9e 编写于 作者: U uybhatti 提交者: Fabian Hueske

[FLINK-2170] [connectors] Add OrcRowInputFormat and OrcTableSource.

This closes #4670.
上级 5017c679
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connectors</artifactId>
<version>1.4-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-orc_${scala.binary.version}</artifactId>
<name>flink-orc</name>
<packaging>jar</packaging>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
<version>1.4.0</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-clean-plugin</artifactId>
<versionRange>[1,)</versionRange>
<goals>
<goal>clean</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<versionRange>[1.7.7,)</versionRange>
<goals>
<goal>schema</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.sources.BatchTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.apache.orc.TypeDescription;
/**
* Creates a TableSource to read ORC file.
*
* <p>The ORC file path and schema is passed during {@link OrcTableSource} construction. configuration is optional.
*
* <p>The OrcTableSource is used as shown in the example below.
*
* <pre>
* {@code
* String path = testInputURL.getPath();
* String schema = "struct<col1:boolean,col2:tinyint,col3:smallint,col4:int>"
* OrcTableSource orcSrc = new OrcTableSource(path, schema);
* tEnv.registerTableSource("orcTable", orcSrc);
* Table res = tableEnv.sql("SELECT * FROM orcTable");
* }
* </pre>
*/
public class OrcTableSource implements BatchTableSource<Row>, ProjectableTableSource<Row> {
private String path;
private TypeDescription orcSchema;
private RowTypeInfo typeInfo;
private Configuration orcConfig;
private int[] fieldMapping;
/**
* The ORC file path and schema.
*
* @param path the path of orc file
* @param orcSchema schema of orc file
*/
public OrcTableSource(String path, String orcSchema) {
this(path, orcSchema, new Configuration());
}
/**
* The file path and schema of orc file, and configuration to read orc file .
*
* @param path the path of orc file
* @param orcSchema schema of orc file
* @param orcConfig configuration to read orc file
*/
public OrcTableSource(String path, String orcSchema, Configuration orcConfig) {
this(path, TypeDescription.fromString(orcSchema), orcConfig);
}
public OrcTableSource(String path, TypeDescription orcSchema, Configuration orcConfig) {
this.path = path;
this.orcSchema = orcSchema;
this.orcConfig = orcConfig;
this.typeInfo = (RowTypeInfo) OrcUtils.schemaToTypeInfo(this.orcSchema);
}
@Override
public DataSet<Row> getDataSet(ExecutionEnvironment execEnv) {
RowOrcInputFormat orcIF = new RowOrcInputFormat(path, orcSchema, orcConfig);
if (fieldMapping != null) {
orcIF.setFieldMapping(fieldMapping);
}
return execEnv.createInput(orcIF);
}
@Override
public TypeInformation<Row> getReturnType() {
return typeInfo;
}
@Override
public TableSource<Row> projectFields(int[] fields) {
OrcTableSource copy = new OrcTableSource(path, orcSchema, orcConfig);
// set field mapping
copy.fieldMapping = fields;
// adapt TypeInfo
TypeInformation[] fieldTypes = new TypeInformation[fields.length];
String[] fieldNames = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
fieldTypes[i] = this.typeInfo.getTypeAt(fields[i]);
fieldNames[i] = this.typeInfo.getFieldNames()[fields[i]];
}
copy.typeInfo = new RowTypeInfo(fieldTypes, fieldNames);
return copy;
}
@Override
public String explainSource() {
return "ORC Source file at path " + this.path + " with schema " + this.orcSchema;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import static org.apache.flink.orc.OrcUtils.fillRows;
/**
* InputFormat to read ORC data.
* For Optimization, reading is done in batch instead of a single row.
*/
public class RowOrcInputFormat
extends FileInputFormat<Row>
implements ResultTypeQueryable<Row> {
private static final Logger LOG = LoggerFactory.getLogger(RowOrcInputFormat.class);
private static final int BATCH_SIZE = 1024;
private org.apache.hadoop.conf.Configuration config;
private TypeDescription schema;
private int[] fieldMapping;
private transient RowTypeInfo rowType;
private transient RecordReader orcRowsReader;
private transient VectorizedRowBatch rowBatch;
private transient Row[] rows;
private transient int rowInBatch;
public RowOrcInputFormat(String path, String schemaString, Configuration orcConfig) {
this(path, TypeDescription.fromString(schemaString), orcConfig);
}
public RowOrcInputFormat(String path, TypeDescription orcSchema, Configuration orcConfig) {
super(new Path(path));
this.unsplittable = false;
this.schema = orcSchema;
this.rowType = (RowTypeInfo) OrcUtils.schemaToTypeInfo(schema);
this.config = orcConfig;
this.fieldMapping = new int[this.schema.getChildren().size()];
for (int i = 0; i < fieldMapping.length; i++) {
this.fieldMapping[i] = i;
}
}
public void setFieldMapping(int[] fieldMapping) {
this.fieldMapping = fieldMapping;
// adapt result type
TypeInformation[] fieldTypes = new TypeInformation[fieldMapping.length];
String[] fieldNames = new String[fieldMapping.length];
for (int i = 0; i < fieldMapping.length; i++) {
fieldTypes[i] = this.rowType.getTypeAt(fieldMapping[i]);
fieldNames[i] = this.rowType.getFieldNames()[fieldMapping[i]];
}
this.rowType = new RowTypeInfo(fieldTypes, fieldNames);
}
private boolean[] computeProjectionMask() {
boolean[] projectionMask = new boolean[schema.getMaximumId() + 1];
for (int inIdx : fieldMapping) {
TypeDescription fieldSchema = schema.getChildren().get(inIdx);
for (int i = fieldSchema.getId(); i <= fieldSchema.getMaximumId(); i++) {
projectionMask[i] = true;
}
}
return projectionMask;
}
@Override
public void openInputFormat() throws IOException {
super.openInputFormat();
this.rows = new Row[BATCH_SIZE];
for (int i = 0; i < BATCH_SIZE; i++) {
rows[i] = new Row(fieldMapping.length);
}
}
@Override
public void open(FileInputSplit fileSplit) throws IOException {
this.currentSplit = fileSplit;
Preconditions.checkArgument(this.splitStart == 0, "ORC files must be read from the start.");
if (LOG.isDebugEnabled()) {
LOG.debug("Opening ORC file " + fileSplit.getPath());
}
org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(fileSplit.getPath().getPath());
Reader orcReader = OrcFile.createReader(hPath, OrcFile.readerOptions(config));
Reader.Options options = orcReader.options()
.range(fileSplit.getStart(), fileSplit.getLength())
.useZeroCopy(OrcConf.USE_ZEROCOPY.getBoolean(config))
.skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(config))
.tolerateMissingSchema(OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(config));
options.include(computeProjectionMask());
// check that schema of file is as expected
if (!this.schema.equals(orcReader.getSchema())) {
throw new RuntimeException("Invalid schema for file at " + this.filePath +
" Expected:" + this.schema + " Actual: " + orcReader.getSchema());
}
this.orcRowsReader = orcReader.rows(options);
// assign ids
this.schema.getId();
this.rowBatch = schema.createRowBatch(BATCH_SIZE);
rowInBatch = 0;
}
@Override
public void close() throws IOException {
if (orcRowsReader != null) {
this.orcRowsReader.close();
}
this.orcRowsReader = null;
}
@Override
public void closeInputFormat() throws IOException {
this.rows = null;
this.rows = null;
this.schema = null;
this.rowBatch = null;
}
@Override
public boolean reachedEnd() throws IOException {
return !ensureBatch();
}
private boolean ensureBatch() throws IOException {
if (rowInBatch >= rowBatch.size) {
rowInBatch = 0;
boolean moreRows = orcRowsReader.nextBatch(rowBatch);
if (moreRows) {
// read rows
fillRows(rows, schema, rowBatch, fieldMapping);
}
return moreRows;
}
return true;
}
@Override
public Row nextRecord(Row reuse) throws IOException {
return rows[this.rowInBatch++];
}
@Override
public TypeInformation<Row> getProducedType() {
return rowType;
}
// --------------------------------------------------------------------------------------------
// Custom serialization methods
// --------------------------------------------------------------------------------------------
private void writeObject(ObjectOutputStream out) throws IOException {
this.config.write(out);
out.writeUTF(schema.toString());
out.writeInt(fieldMapping.length);
for (int f : fieldMapping) {
out.writeInt(f);
}
}
@SuppressWarnings("unchecked")
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.readFields(in);
if (this.config == null) {
this.config = configuration;
}
this.schema = TypeDescription.fromString(in.readUTF());
this.fieldMapping = new int[in.readInt()];
for (int i = 0; i < fieldMapping.length; i++) {
this.fieldMapping[i] = in.readInt();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.Row;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
/**
* Tests for {@link OrcTableSource}.
*/
public class OrcTableSourceITCase extends MultipleProgramsTestBase {
private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
"middle:struct<list:array<struct<int1:int,string1:string>>>," +
"list:array<struct<int1:int,string1:string>>," +
"map:map<string,struct<int1:int,string1:string>>>";
private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
private static final String[] TEST1_DATA = new String[] {
"false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
"true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
"[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" };
public OrcTableSourceITCase() {
super(TestExecutionMode.COLLECTION);
}
@Test
public void testOrcTableSource() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
assert (test1URL != null);
OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
tEnv.registerTableSource("orcTable", orc);
String query = "Select * from orcTable";
Table t = tEnv.sql(query);
DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
List<Row> records = dataSet.collect();
Assert.assertEquals(records.size(), 2);
List<String> actualRecords = new ArrayList<>();
for (Row record : records) {
Assert.assertEquals(record.getArity(), 12);
actualRecords.add(record.toString());
}
Assert.assertThat(actualRecords, CoreMatchers.hasItems(TEST1_DATA));
}
@Test
public void testOrcTableProjection() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
assert(test1URL != null);
OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
tEnv.registerTableSource("orcTable", orc);
String query = "Select middle,list,map from orcTable";
Table t = tEnv.sql(query);
String[] colNames = new String[] {"middle", "list", "map"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO},
new String[] {"int1", "string1"});
RowTypeInfo structTypeInfo = new RowTypeInfo(
new TypeInformation[] {ObjectArrayTypeInfo.getInfoFor(rowTypeInfo)},
new String[] {"list"});
TypeInformation[] colTypes = new TypeInformation[] {
structTypeInfo,
ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo)
};
TableSchema actualTableSchema = new TableSchema(colNames, colTypes);
Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames);
Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
Assert.assertEquals(actualTableSchema.toString(), t.getSchema().toString());
DataSet<Row> dataSet = tEnv.toDataSet(t, Row.class);
List<Row> records = dataSet.collect();
Assert.assertEquals(records.size(), 2);
for (Row record: records) {
Assert.assertEquals(record.getArity(), 3);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.junit.Assert;
import org.junit.Test;
import java.net.URL;
/**
* Unit Tests for {@link OrcTableSource}.
*/
public class OrcTableSourceTest {
private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
"middle:struct<list:array<struct<int1:int,string1:string>>>," +
"list:array<struct<int1:int,string1:string>>," +
"map:map<string,struct<int1:int,string1:string>>>";
private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
@Test
public void testOrcSchema() throws Exception {
assert(test1URL != null);
OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
String expectedSchema = "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer, long1: Long, " +
"float1: Float, double1: Double, bytes1: byte[], string1: String, " +
"middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>), " +
"list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>, " +
"map: Map<String, Row(int1: Integer, string1: String)>)";
Assert.assertEquals(expectedSchema, orc.getReturnType().toString());
}
@Test
public void testOrcTableSchema() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
assert(test1URL != null);
OrcTableSource orc = new OrcTableSource(test1URL.getPath(), TEST1_SCHEMA);
tEnv.registerTableSource("orcTable", orc);
String query = "Select * from orcTable";
Table t = tEnv.sql(query);
String[] colNames = new String[] {
"boolean1", "byte1", "short1", "int1", "long1", "float1",
"double1", "bytes1", "string1", "list", "list0", "map"
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(
new TypeInformation[] {
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO},
new String[] {"int1", "string1"});
TypeInformation[] colTypes = new TypeInformation[] {
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.BYTE_TYPE_INFO,
BasicTypeInfo.SHORT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.FLOAT_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
ObjectArrayTypeInfo.getInfoFor(rowTypeInfo),
new MapTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, rowTypeInfo)
};
TableSchema expectedTableSchema = new TableSchema(colNames, colTypes);
Assert.assertArrayEquals(t.getSchema().getColumnNames(), colNames);
Assert.assertArrayEquals(t.getSchema().getTypes(), colTypes);
Assert.assertEquals(expectedTableSchema.toString(), t.getSchema().toString());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.orc;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.types.Row;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Tests for the {@link RowOrcInputFormat}.
*/
public class RowOrcInputFormatTest {
private RowOrcInputFormat rowOrcInputFormat;
@After
public void tearDown() throws IOException {
if (rowOrcInputFormat != null) {
rowOrcInputFormat.close();
rowOrcInputFormat.closeInputFormat();
}
rowOrcInputFormat = null;
}
private final URL test1URL = getClass().getClassLoader().getResource("TestOrcFile.test1.orc");
private static final String TEST1_SCHEMA = "struct<boolean1:boolean,byte1:tinyint,short1:smallint,int1:int," +
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
"middle:struct<list:array<struct<int1:int,string1:string>>>," +
"list:array<struct<int1:int,string1:string>>," +
"map:map<string,struct<int1:int,string1:string>>>";
private static final String[] TEST1_DATA = new String[] {
"false,1,1024,65536,9223372036854775807,1.0,-15.0,[0, 1, 2, 3, 4],hi,[1,bye, 2,sigh],[3,good, 4,bad],{}",
"true,100,2048,65536,9223372036854775807,2.0,-5.0,[],bye,[1,bye, 2,sigh]," +
"[100000000,cat, -100000,in, 1234,hat],{chani=5,chani, mauddib=1,mauddib}" };
private static final String[] TEST1_PROJECTED_DATA = new String[] {
"{},[3,good, 4,bad],[1,bye, 2,sigh],hi,[0, 1, 2, 3, 4],-15.0,1.0,9223372036854775807,65536,1024,1,false",
"{chani=5,chani, mauddib=1,mauddib},[100000000,cat, -100000,in, 1234,hat],[1,bye, 2,sigh],bye," +
"[],-5.0,2.0,9223372036854775807,65536,2048,100,true" };
private static final String TEST1_INVALID_SCHEMA = "struct<boolean1:int,byte1:tinyint,short1:smallint,int1:int," +
"long1:bigint,float1:float,double1:double,bytes1:binary,string1:string," +
"middle:struct<list:array<struct<int1:int,string1:string>>>," +
"list:array<struct<int1:int,string1:string>>," +
"map:map<string,struct<int1:int,string1:string>>>";
@Test(expected = FileNotFoundException.class)
public void testInvalidPath() throws IOException{
rowOrcInputFormat = new RowOrcInputFormat("TestOrcFile.test2.orc", TEST1_SCHEMA, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
rowOrcInputFormat.open(inputSplits[0]);
}
@Test(expected = RuntimeException.class)
public void testInvalidSchema() throws IOException{
assert(test1URL != null);
rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_INVALID_SCHEMA, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
rowOrcInputFormat.open(inputSplits[0]);
}
@Test(expected = IndexOutOfBoundsException.class)
public void testInvalidProjection() throws IOException{
assert(test1URL != null);
rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
int[] projectionMask = {14};
rowOrcInputFormat.setFieldMapping(projectionMask);
}
@Test
public void testMajorDataTypes() throws IOException{
// test for boolean,byte,short,int,long,float,double,bytes,string,struct,list,map
assert(test1URL != null);
rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = null;
int count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
Assert.assertEquals(row.toString(), TEST1_DATA[count++]);
}
}
}
@Test
public void testProjection() throws IOException{
assert(test1URL != null);
rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
int[] projectionMask = {11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0};
rowOrcInputFormat.setFieldMapping(projectionMask);
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = null;
int count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
Assert.assertEquals(row.toString(), TEST1_PROJECTED_DATA[count++]);
}
}
}
@Test
public void testTimeStampAndDate() throws IOException{
URL expectedDataURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.dat");
assert(expectedDataURL != null);
List<String> expectedTimeStampAndDate = Files.readAllLines(Paths.get(expectedDataURL.getPath()));
URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.testDate1900.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<time:timestamp,date:date>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
List<Object> actualTimeStampAndDate = new ArrayList<>();
Row row = null;
int count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
count++;
if (count <= 10000) {
actualTimeStampAndDate.add(row.getField(0) + "," + row.getField(1));
}
}
}
Assert.assertEquals(count, 70000);
Assert.assertEquals(expectedTimeStampAndDate.size(), actualTimeStampAndDate.size());
Assert.assertEquals(expectedTimeStampAndDate.toString(), actualTimeStampAndDate.toString());
}
@Test
public void testDecimal() throws IOException{
URL expectedDataURL = getClass().getClassLoader().getResource("decimal.dat");
List<String> expectedDecimal = Files.readAllLines(Paths.get(expectedDataURL.getPath()));
URL testInputURL = getClass().getClassLoader().getResource("decimal.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<_col0:decimal(10,5)>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
List<Object> actualDecimal = new ArrayList<>();
Row row = null;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
actualDecimal.add(row.getField(0));
}
}
Assert.assertEquals(expectedDecimal.size(), actualDecimal.size());
Assert.assertEquals(expectedDecimal.toString(), actualDecimal.toString());
}
@Test
public void testEmptyFile() throws IOException{
URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.emptyFile.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
rowOrcInputFormat = new RowOrcInputFormat(path, TEST1_SCHEMA, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = new Row(1);
int count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
count++;
}
}
Assert.assertEquals(count, 0);
}
@Test
public void testLargeFile() throws IOException{
URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
"_col5:string,_col6:int,_col7:int,_col8:int>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = new Row(1);
int count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
count++;
}
}
Assert.assertEquals(count, 1920800);
}
@Test
public void testProducedType() throws IOException{
assert(test1URL != null);
rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
rowOrcInputFormat.open(inputSplits[0]);
TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
Assert.assertEquals(type.toString(), "Row(boolean1: Boolean, byte1: Byte, short1: Short, int1: Integer," +
" long1: Long, float1: Float, double1: Double, bytes1: byte[], string1: String," +
" middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
" list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," +
" map: Map<String, Row(int1: Integer, string1: String)>)");
}
@Test
public void testProducedTypeWithProjection() throws IOException{
assert(test1URL != null);
rowOrcInputFormat = new RowOrcInputFormat(test1URL.getPath(), TEST1_SCHEMA, new Configuration());
int[] projectionMask = {9, 10, 11};
rowOrcInputFormat.setFieldMapping(projectionMask);
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
rowOrcInputFormat.open(inputSplits[0]);
TypeInformation<Row> type = rowOrcInputFormat.getProducedType();
Assert.assertEquals(type.toString(), "Row(middle: Row(list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>)," +
" list: ObjectArrayTypeInfo<Row(int1: Integer, string1: String)>," +
" map: Map<String, Row(int1: Integer, string1: String)>)");
}
@Test
public void testLongList() throws Exception {
URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listlong.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<mylist1:array<bigint>>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = null;
long count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
Assert.assertEquals(row.getArity(), 1);
Object object = row.getField(0);
long[] l = (long[]) object;
Assert.assertEquals(l.length, 2);
if (count < 50) {
Assert.assertArrayEquals(l, new long[]{count, count + 1});
}
else {
Assert.assertArrayEquals(l, new long[]{0L, 0L});
}
count = count + 2;
}
}
Assert.assertEquals(count, 100);
}
@Test
public void testStringList() throws Exception {
URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.liststring.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<mylist1:array<string>>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = null;
long count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
Assert.assertEquals(row.getArity(), 1);
Object object = row.getField(0);
String[] l = (String[]) object;
Assert.assertEquals(l.length, 2);
Assert.assertArrayEquals(l, new String[]{"hello" + count, "hello" + (count + 1) });
count = count + 2;
}
}
Assert.assertEquals(count, 200);
}
@Test
public void testListOfListOfStructOfLong() throws Exception {
URL testInputURL = getClass().getClassLoader().getResource("TestOrcFile.listliststructlong.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<mylist1:array<array<struct<mylong1:bigint>>>>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(1);
Assert.assertEquals(inputSplits.length, 1);
Row row = null;
long count = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
Assert.assertEquals(row.getArity(), 1);
Object[] objects = (Object[]) row.getField(0);
Assert.assertEquals(objects.length, 1);
Object[] objects1 = (Object[]) objects[0];
Assert.assertEquals(objects1.length, 1);
Row[] nestedRows = Arrays.copyOf(objects1, objects1.length, Row[].class);
Assert.assertEquals(nestedRows.length, 1);
Assert.assertEquals(nestedRows[0].getArity(), 1);
Assert.assertEquals(nestedRows[0].getField(0), count);
count++;
}
}
Assert.assertEquals(count, 100);
}
@Test
public void testSplit() throws IOException{
URL testInputURL = getClass().getClassLoader().getResource("demo-11-none.orc");
assert(testInputURL != null);
String path = testInputURL.getPath();
String schema = "struct<_col0:int,_col1:string,_col2:string,_col3:string,_col4:int," +
"_col5:string,_col6:int,_col7:int,_col8:int>";
rowOrcInputFormat = new RowOrcInputFormat(path, schema, new Configuration());
rowOrcInputFormat.openInputFormat();
FileInputSplit[] inputSplits = rowOrcInputFormat.createInputSplits(10);
Assert.assertEquals(inputSplits.length, 10);
Row row = null;
int countTotalRecords = 0;
for (FileInputSplit split : inputSplits) {
rowOrcInputFormat.open(split);
int countSplitRecords = 0;
while (!rowOrcInputFormat.reachedEnd()) {
row = rowOrcInputFormat.nextRecord(row);
countSplitRecords++;
}
Assert.assertNotEquals(countSplitRecords, 1920800);
countTotalRecords += countSplitRecords;
}
Assert.assertEquals(countTotalRecords, 1920800);
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=OFF, A1
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
......@@ -36,6 +36,7 @@ under the License.
<packaging>pom</packaging>
<modules>
<module>flink-orc</module>
<module>flink-jdbc</module>
<module>flink-hadoop-compatibility</module>
<module>flink-hbase</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册