提交 a5ec7c78 编写于 作者: S Shang Shujie

add support for decimal-hive-parquet, unicode, and their test cases

上级 857f1e10
......@@ -24,7 +24,7 @@ JAR_FILES = dist/gphd-1.0-gnet-1.0.0.1.jar \
JAVADOC_TARS = gnet-1.1-javadoc.tar gnet-1.0-javadoc.tar
all: $(JAR_FILES) $(JAVADOC_TARS)
all: $(JAR_FILES) unittest $(JAVADOC_TARS)
dist/gphd-1.0-gnet-1.0.0.1.jar:
$(ANT) clean
......
......@@ -42,13 +42,13 @@
<dependency org="apache" name="parquet-format" rev="2.3.0-incubating" conf="hadoop2->master"/>
<!-- dependency for UT -->
<dependency org="apache" name="hadoop-client" rev="2.0.5-alpha" conf="ut->*"/>
<dependency org="apache" name="hadoop-common" rev="2.0.5-alpha" conf="ut->*">
<dependency org="org.apache.hadoop" name="hadoop-client" rev="2.4.0" conf="ut->*"/>
<dependency org="org.apache.hadoop" name="hadoop-common" rev="2.4.0" conf="ut->*">
<artifact name="hadoop-common" type="jar"/>
<artifact name="hadoop-common" type="test-jar" ext="jar" e:classifier="tests" />
</dependency>
<dependency org="apache" name="hadoop-hdfs" rev="2.0.5-alpha" conf="ut->*">
<dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="2.4.0" conf="ut->*">
<artifact name="hadoop-hdfs" type="jar"/>
<artifact name="hadoop-hdfs" type="test-jar" ext="jar" e:classifier="tests" />
</dependency>
......@@ -60,11 +60,12 @@
<dependency org="apache" name="avro" rev="1.7.7" conf="ut->*"/>
<dependency org="apache" name="avro-mapred" rev="1.7.7" conf="ut->*"/>
<dependency org="codehaus" name="jackson-core-asl" rev="1.9.3" conf="ut->*"/>
<dependency org="apache" name="parquet-column" rev="1.7.0" conf="ut->*"/>
<dependency org="apache" name="parquet-common" rev="1.7.0" conf="ut->*"/>
<dependency org="apache" name="parquet-hadoop" rev="1.7.0" conf="ut->*"/>
<dependency org="apache" name="parquet-encoding" rev="1.7.0" conf="ut->*"/>
<dependency org="apache" name="parquet-generator" rev="1.7.0" conf="ut->*"/>
<dependency org="apache" name="parquet-format" rev="2.3.0-incubating" conf="ut->*"/>
</dependencies>
</dependencies>
</ivy-module>
......@@ -2,6 +2,7 @@ package com.emc.greenplum.gpdb.hadoop.formathandler;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
......@@ -68,8 +69,10 @@ public class AvroFileReader {
DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance();
TransformerFactory tf = TransformerFactory.newInstance();
public AvroFileReader(Configuration conf, int segid, int totalseg,
String inputpath, List<ColumnSchema> tableSchema, String schemaFile, boolean schemaComplete, boolean autoSelect) {
OutputStream out = System.out;
public AvroFileReader(Configuration conf, int segid, int totalseg,String inputpath, List<ColumnSchema> tableSchema,
String schemaFile, boolean schemaComplete, boolean autoSelect, OutputStream out) {
this.conf = conf;
this.segid = segid;
this.totalseg = totalseg;
......@@ -78,6 +81,7 @@ public class AvroFileReader {
this.schemaFile = schemaFile;
this.schemaComplete = schemaComplete;
this.autoSelect = autoSelect;
this.out = out;
}
/**
......@@ -111,7 +115,7 @@ public class AvroFileReader {
GpdbAvroInputFormat.setInputPaths(jconf, files);
InputSplit[] splits = (new GpdbAvroInputFormat()).getSplits(jconf, totalseg);
DataOutputStream dos = new DataOutputStream(System.out);
DataOutputStream dos = new DataOutputStream(out);
AvroWrapper<GenericRecord> avroWrapper = new AvroWrapper<GenericRecord>();
......
......@@ -2,7 +2,10 @@ package com.emc.greenplum.gpdb.hadoop.formathandler;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
......@@ -47,6 +50,8 @@ import com.emc.greenplum.gpdb.hadoop.io.GPDBWritable;
import com.emc.greenplum.gpdb.hdfsconnector.ColumnSchema;
public class GpdbParquetFileReader {
private static final String HIVE_SCHEMA_NAME = "hive_schema";
boolean DATA_TIME_ANNOTATION_ON = false;
Configuration conf;
......@@ -64,11 +69,15 @@ public class GpdbParquetFileReader {
boolean autoSelect = false;
boolean isHiveFile = false;
OutputStream out = System.out;
DocumentBuilderFactory factory=DocumentBuilderFactory.newInstance();
TransformerFactory tf = TransformerFactory.newInstance();
public GpdbParquetFileReader(Configuration conf, int segid, int totalseg,
String inputpath, List<ColumnSchema> tableSchema, boolean schemaComplete, boolean autoSelect) {
String inputpath, List<ColumnSchema> tableSchema, boolean schemaComplete, boolean autoSelect, OutputStream out) {
this.conf = conf;
this.segid = segid;
this.totalseg = totalseg;
......@@ -76,6 +85,7 @@ public class GpdbParquetFileReader {
this.tableSchemas = tableSchema;
this.schemaComplete = schemaComplete;
this.autoSelect = autoSelect;
this.out = out;
}
/**
......@@ -93,7 +103,7 @@ public class GpdbParquetFileReader {
Collections.sort(toReadFileList);
DataOutputStream dos = new DataOutputStream(System.out);
DataOutputStream dos = new DataOutputStream(out);
int counter = 0;
for (FileStatus toRead : toReadFileList) {
......@@ -103,7 +113,7 @@ public class GpdbParquetFileReader {
schema = metadata.getFileMetaData().getSchema();
columnDescriptors = schema.getColumns();
generateTypeArray(schema.getFields());
isHiveFile = checkWhetherHive(schema);
if (tableSchemas != null) {
checkTypeMisMatch(schema, tableSchemas);
}
......@@ -144,6 +154,7 @@ public class GpdbParquetFileReader {
writable.write(dos);
}
}
dos.flush();
fileReader.close();
}
......@@ -151,6 +162,13 @@ public class GpdbParquetFileReader {
dos.close();
}
private boolean checkWhetherHive(MessageType schema) {
if (schema.getName() != null && schema.getName().equals(HIVE_SCHEMA_NAME)) {
return true;
}
return false;
}
/**
* check type mismatch
* @param parquetSchema, gpdbSchema
......@@ -321,8 +339,17 @@ public class GpdbParquetFileReader {
writable.setDouble(index, g.getDouble(index, 0));
break;
case INT96:
case FIXED_LEN_BYTE_ARRAY://fixed_len_byte_array, decimal, interval
case INT96://timestamp in hive
writable.setBytes(index, g.getInt96(index, 0).getBytes());
break;
case FIXED_LEN_BYTE_ARRAY://fixed_len_byte_array, decimal, interval, decimal in hive
if (isHiveFile && oType == OriginalType.DECIMAL) {
int scale = type.asPrimitiveType().getDecimalMetadata().getScale();
BigDecimal bd = new BigDecimal(new BigInteger(g.getBinary(index, 0).getBytes()), scale);
writable.setString(index, bd.toString());
break;
}
case BINARY://utf8, json, bson, decimal
// although parquet schema is bytea, but we will see whether user wants a 'text' field
if (oType == OriginalType.UTF8 || oType == OriginalType.JSON || oType == OriginalType.DECIMAL
......@@ -476,6 +503,9 @@ public class GpdbParquetFileReader {
break;
case INT96:
FormatHandlerUtil.byteArray2OctString(g.getInt96(fieldIndex, elementIndex).getBytes(), sb);
break;
case FIXED_LEN_BYTE_ARRAY:
if (oType == OriginalType.INTERVAL && DATA_TIME_ANNOTATION_ON) {
sb.append( FormatHandlerUtil.buildParquetInterval(g.getBinary(fieldIndex, elementIndex).getBytes()) );
......
......@@ -116,6 +116,16 @@ public class GPDBWritable implements Writable {
public static final int GPAOTID_ARR = 3301;
public static final int GPXLOGLOC_ARR = 3311;
/*
* length size
* */
private static final int LONG_LEN = 8;
private static final int BOOLEAN_LEN = 1;
private static final int DOUBLE_LEN = 8;
private static final int INT_LEN = 4;
private static final int FLOAT_LEN = 4;
private static final int SHORT_LEN = 2;
/*
* Enum of the Database type
*/
......@@ -380,18 +390,24 @@ public class GPDBWritable implements Writable {
int endpadding = roundUpAlignment(datlen, 8) - datlen;
datlen += endpadding;
/* the output data length, it should be eaqual to datlen, for safeguard*/
int realLen = 0;
/* Construct the packet header */
out.writeInt(datlen);
out.writeShort(DATA_VERSION);
out.writeShort(numCol);
realLen += INT_LEN + SHORT_LEN + SHORT_LEN;/*realLen += length of (datlen + DATA_VERSION + numCol)*/
/* Write col type */
for(int i=0; i<numCol; i++)
out.writeByte(enumType[i]);
realLen += numCol;
/* Nullness */
byte[] nullBytes = boolArrayToByteArray(nullBits);
out.write(nullBytes);
realLen += nullBytes.length;
/* Column Value */
for(int i=0; i<numCol; i++) {
......@@ -399,28 +415,31 @@ public class GPDBWritable implements Writable {
/* Pad the alignment byte first */
if (padLength[i] > 0) {
out.write(padbytes, 0, padLength[i]);
realLen += padLength[i];
}
/* Now, write the actual column value */
switch(colType[i]) {
case BIGINT: out.writeLong( ((Long) colValue[i]).longValue()); break;
case BOOLEAN: out.writeBoolean(((Boolean)colValue[i]).booleanValue()); break;
case FLOAT8: out.writeDouble( ((Double) colValue[i]).doubleValue()); break;
case INTEGER: out.writeInt( ((Integer)colValue[i]).intValue()); break;
case REAL: out.writeFloat( ((Float) colValue[i]).floatValue()); break;
case SMALLINT: out.writeShort( ((Short) colValue[i]).shortValue()); break;
case BIGINT: out.writeLong( ((Long) colValue[i]).longValue()); realLen += LONG_LEN; break;
case BOOLEAN: out.writeBoolean(((Boolean)colValue[i]).booleanValue()); realLen += BOOLEAN_LEN; break;
case FLOAT8: out.writeDouble( ((Double) colValue[i]).doubleValue()); realLen += DOUBLE_LEN; break;
case INTEGER: out.writeInt( ((Integer)colValue[i]).intValue()); realLen += INT_LEN; break;
case REAL: out.writeFloat( ((Float) colValue[i]).floatValue()); realLen += FLOAT_LEN; break;
case SMALLINT: out.writeShort( ((Short) colValue[i]).shortValue()); realLen += SHORT_LEN; break;
/* For BYTEA format, add 4byte length header at the beginning */
case BYTEA:
out.writeInt(((byte[])colValue[i]).length);
out.write((byte[])colValue[i]);
realLen += INT_LEN + ((byte[])colValue[i]).length; /*realLen += length of (length + data)*/
break;
/* For text format, add 4byte length header (length include the "\0" at the end)
* at the beginning and add a "\0" at the end */
default: {
String outStr = (String)colValue[i]+"\0";
out.writeInt(outStr.length());
byte[] data = (outStr).getBytes(CHARSET);
out.writeInt(data.length);
out.write(data);
realLen += INT_LEN + data.length;/*realLen += length of (length + data)*/
break;
}
}
......@@ -429,6 +448,12 @@ public class GPDBWritable implements Writable {
/* End padding */
out.write(padbytes, 0, endpadding);
realLen += endpadding;
/* safeguard */
if (datlen != realLen) {
throw new IOException("data length error, data output size is not what expected");
}
}
/**
......
......@@ -159,11 +159,11 @@ public class HDFSReader
readTextFormat();
}
else if (isAvroFormat) {
AvroFileReader avroReader = new AvroFileReader(conf, segid, totalseg, inputpath, tableSchema, schemaFile, schemaComplete, autoSelect);
AvroFileReader avroReader = new AvroFileReader(conf, segid, totalseg, inputpath, tableSchema, schemaFile, schemaComplete, autoSelect, System.out);
avroReader.readAvroFormat();
}
else if (isParquetFormat) {
GpdbParquetFileReader parquetReader = new GpdbParquetFileReader(conf, segid, totalseg, inputpath, tableSchema, schemaComplete, autoSelect);
GpdbParquetFileReader parquetReader = new GpdbParquetFileReader(conf, segid, totalseg, inputpath, tableSchema, schemaComplete, autoSelect, System.out);
parquetReader.readParquetFormat();
}
else {
......
......@@ -15,48 +15,178 @@
package com.emc.greenplum.gpdb.hdfsconnector;
import com.emc.greenplum.gpdb.hadoop.io.GPDBWritable;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import com.emc.greenplum.gpdb.hadoop.formathandler.AvroFileReader;
import com.emc.greenplum.gpdb.hadoop.formathandler.GpdbParquetFileReader;
import com.emc.greenplum.gpdb.hadoop.io.GPDBWritable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
public class TestConnectorUtil {
private static MiniDFSCluster cluster;
/*
* setup the cluster and upload test files
*/
@BeforeClass
public static void setupBeforeClass() throws Exception {
// final Configuration conf = new Configuration();
// cluster= new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).build();
public static void setupBeforeClass() throws IllegalArgumentException, IOException {
final Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
cluster.getFileSystem().mkdirs(new Path("/tmp"));
int hadoopPort = cluster.getNameNodePort();
String tmpDir = "hdfs://127.0.0.1:" + hadoopPort + "/tmp";
//decimal.pq : file generated by hive using parquet format, contains two
//columns of decimal
Path decimal = new Path(tmpDir + "/decimal.pq");
cluster.getFileSystem().copyFromLocalFile(new Path((new File("")).getAbsolutePath() + "/src/test/case_file/decimal.pq"), decimal);
//alertlog.avro : file contains unicode text
Path alertlog = new Path(tmpDir + "/alertlog.avro");
cluster.getFileSystem().copyFromLocalFile(new Path((new File("")).getAbsolutePath() + "/src/test/case_file/alertlog.avro"), alertlog);
}
/*
* shutdown the cluster
*/
@AfterClass
public static void teardownAfterClass() throws Exception {
// cluster.shutdown();
cluster.shutdown();
}
/*
* test set for fs.defaultFS
*/
@Test
public void test_should_able_to_connect_to_hdfs() throws URISyntaxException {
public void test_should_able_to_connect_to_hdfs() throws URISyntaxException, IOException {
Configuration conf = new Configuration();
URI inputURI = new URI("gphdfs://localhost:8020/test.txt");
URI inputURI = new URI("gphdfs://localhost:9000/test.txt");
ConnectorUtil.setHadoopFSURI(conf, inputURI, "cdh4.1");
assertEquals("hdfs://localhost:8020", conf.get("fs.defaultFS"));
assertEquals("hdfs://localhost:9000", conf.get("fs.defaultFS"));
}
/*
* make sure all the test files are already in hadoop
*/
@Test
public void test_list_file() throws FileNotFoundException, IllegalArgumentException, IOException {
ArrayList<DataNode> dns = cluster.getDataNodes();
assertEquals(dns.size(), 1);
int fileNum = 0;
RemoteIterator<LocatedFileStatus> fsIterator= cluster.getFileSystem().listFiles(new Path("/"), true);
while(fsIterator.hasNext()){
fileNum ++;
System.out.println(fsIterator.next().getPath());
}
assertEquals(fileNum, 2);
}
/*
* test unicode support
*/
@Test
public void test_unicode() {
int hadoopPort = cluster.getNameNodePort();
Configuration conf = new Configuration();
conf.addResource("hdfs-site.xml");
try {
URI uri = new URI("gphdfs://localhost:" + hadoopPort + "/tmp/alertlog.avro");
ConnectorUtil.setHadoopFSURI(conf, uri, "gphdfs");
ByteArrayOutputStream bout = new ByteArrayOutputStream();
AvroFileReader aReader = new AvroFileReader(conf, 0, 1, uri.getPath(), null, null, false, false, bout);
aReader.readAvroFormat();
byte[] barray = bout.toByteArray();
int line = 0;
DataInputStream din = new DataInputStream(new ByteArrayInputStream(barray));
while(din.available() != 0) {
GPDBWritable writable = new GPDBWritable();
writable.readFields(din);
line++;
}
assertEquals(line, 1943);
} catch (IOException e) {
fail(e.getMessage());
} catch (URISyntaxException e) {
fail(e.getMessage());
}
}
/*
* test support for decimal in parquet file generated by hive
*/
@Test
public void test_hive_parquet_decimal() {
Configuration conf = new Configuration();
try {
int hadoopPort = cluster.getNameNodePort();
URI uri = new URI("gphdfs://localhost:" + hadoopPort + "/tmp/decimal.pq");
ConnectorUtil.setHadoopFSURI(conf, uri, "gphdfs");
String inputPath = uri.getPath();
ByteArrayOutputStream bout = new ByteArrayOutputStream();
GpdbParquetFileReader pReader = new GpdbParquetFileReader(conf, 0, 1, inputPath, null, false, false, bout);
pReader.readParquetFormat();
byte[] barray = bout.toByteArray();
//this is the standard result which gphdfs writes to gpdb
byte[] expect = {0, 0, 0, 32, 0, 1, 0, 2, 7, 7, 0, 0, 0, 0, 0, 7, 49, 50, 51, 46, 50, 49, 0, 0, 0, 0, 0, 4, 51, 46, 49, 0};
for (int i = 0; i < barray.length; i++) {
assertEquals(barray[i], expect[i]);
}
DataInputStream din = new DataInputStream(new ByteArrayInputStream(barray));
GPDBWritable writable = new GPDBWritable();
writable.readFields(din);
String c1 = writable.getString(0);
String c2 = writable.getString(1);
assertEquals(c1, "123.21");
assertEquals(c2, "3.1");
assertEquals(din.available(), 0);
} catch (IOException e) {
fail(e.getMessage());
} catch (URISyntaxException e) {
fail(e.getMessage());
}
}
// @Test
public void test_should_able_to_connect_to_hdfs_with_ha() throws URISyntaxException {
Configuration conf = new Configuration();
URI inputURI = new URI("gphdfs://nameservice1/test.txt");
......@@ -65,4 +195,5 @@ public class TestConnectorUtil {
assertEquals("hdfs://nameservice1", conf.get("fs.defaultFS"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册