未验证 提交 1444fd68 编写于 作者: R Rui Li 提交者: GitHub

[FLINK-16767][hive] Failed to read Hive table with RegexSerDe


This closes #11504
上级 e789bb71
......@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -40,13 +41,17 @@ public class HiveTablePartition implements Serializable {
/** The map of partition key names and their values. */
private final Map<String, Object> partitionSpec;
public HiveTablePartition(StorageDescriptor storageDescriptor) {
this(storageDescriptor, new LinkedHashMap<>());
// Table properties that should be used to initialize SerDe
private final Properties tableProps;
public HiveTablePartition(StorageDescriptor storageDescriptor, Properties tableProps) {
this(storageDescriptor, new LinkedHashMap<>(), tableProps);
}
public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, Object> partitionSpec) {
public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, Object> partitionSpec, Properties tableProps) {
this.storageDescriptor = checkNotNull(storageDescriptor, "storageDescriptor can not be null");
this.partitionSpec = checkNotNull(partitionSpec, "partitionSpec can not be null");
this.tableProps = checkNotNull(tableProps, "tableProps can not be null");
}
public StorageDescriptor getStorageDescriptor() {
......@@ -56,4 +61,8 @@ public class HiveTablePartition implements Serializable {
public Map<String, Object> getPartitionSpec() {
return partitionSpec;
}
public Properties getTableProps() {
return tableProps;
}
}
......@@ -35,6 +35,7 @@ import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.catalog.hive.descriptors.HiveCatalogValidator;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
......@@ -51,6 +52,7 @@ import org.apache.flink.util.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.JobConf;
import org.apache.thrift.TException;
import org.slf4j.Logger;
......@@ -66,6 +68,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
/**
......@@ -281,6 +284,8 @@ public class HiveTableSource implements
String dbName = tablePath.getDatabaseName();
String tableName = tablePath.getObjectName();
List<String> partitionColNames = catalogTable.getPartitionKeys();
Table hiveTable = client.getTable(dbName, tableName);
Properties tableProps = HiveReflectionUtils.getTableMetadata(hiveShim, hiveTable);
if (partitionColNames != null && partitionColNames.size() > 0) {
final String defaultPartitionName = jobConf.get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname,
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal);
......@@ -309,11 +314,11 @@ public class HiveTableSource implements
}
partitionColValues.put(partitionColName, partitionObject);
}
HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues);
HiveTablePartition hiveTablePartition = new HiveTablePartition(sd, partitionColValues, tableProps);
allHivePartitions.add(hiveTablePartition);
}
} else {
allHivePartitions.add(new HiveTablePartition(client.getTable(dbName, tableName).getSd()));
allHivePartitions.add(new HiveTablePartition(hiveTable.getSd(), tableProps));
}
} catch (TException e) {
throw new FlinkHiveException("Failed to collect all partitions from hive metaStore", e);
......
......@@ -22,7 +22,6 @@ import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter;
import org.apache.flink.connectors.hive.FlinkHiveException;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.table.dataformat.BaseRow;
import org.apache.flink.table.dataformat.DataFormatConverters;
import org.apache.flink.table.dataformat.GenericRow;
......@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR;
......@@ -122,9 +120,7 @@ public class HiveMapredSplitReader implements SplitReader {
try {
deserializer = (Deserializer) Class.forName(sd.getSerdeInfo().getSerializationLib()).newInstance();
Configuration conf = new Configuration();
//properties are used to initialize hive Deserializer properly.
Properties properties = HiveTableUtil.createPropertiesFromStorageDescriptor(sd);
SerDeUtils.initializeSerDe(deserializer, conf, properties, null);
SerDeUtils.initializeSerDe(deserializer, conf, hiveTablePartition.getTableProps(), null);
structObjectInspector = (StructObjectInspector) deserializer.getObjectInspector();
structFields = structObjectInspector.getAllStructFieldRefs();
} catch (Exception e) {
......
......@@ -33,13 +33,9 @@ import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import java.util.ArrayList;
......@@ -47,11 +43,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import static org.apache.flink.table.catalog.hive.HiveCatalogConfig.DEFAULT_LIST_COLUMN_TYPES_SEPARATOR;
/**
* Utils to for Hive-backed table.
*/
......@@ -113,34 +106,6 @@ public class HiveTableUtil {
// Helper methods
// --------------------------------------------------------------------------------------------
/**
* Create properties info to initialize a SerDe.
* @param storageDescriptor
* @return
*/
public static Properties createPropertiesFromStorageDescriptor(StorageDescriptor storageDescriptor) {
SerDeInfo serDeInfo = storageDescriptor.getSerdeInfo();
Map<String, String> parameters = serDeInfo.getParameters();
Properties properties = new Properties();
properties.setProperty(
serdeConstants.SERIALIZATION_FORMAT,
parameters.get(serdeConstants.SERIALIZATION_FORMAT));
List<String> colTypes = new ArrayList<>();
List<String> colNames = new ArrayList<>();
List<FieldSchema> cols = storageDescriptor.getCols();
for (FieldSchema col: cols){
colTypes.add(col.getType());
colNames.add(col.getName());
}
properties.setProperty(serdeConstants.LIST_COLUMNS, StringUtils.join(colNames, String.valueOf(SerDeUtils.COMMA)));
// Note: serdeConstants.COLUMN_NAME_DELIMITER is not defined in previous Hive. We use a literal to save on shim
properties.setProperty("column.name.delimite", String.valueOf(SerDeUtils.COMMA));
properties.setProperty(serdeConstants.LIST_COLUMN_TYPES, StringUtils.join(colTypes, DEFAULT_LIST_COLUMN_TYPES_SEPARATOR));
properties.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
properties.putAll(parameters);
return properties;
}
/**
* Creates a Hive partition instance.
*/
......
......@@ -565,6 +565,24 @@ public class TableEnvHiveConnectorTest {
}
}
@Test
public void testRegexSerDe() throws Exception {
hiveShell.execute("create database db1");
try {
hiveShell.execute("create table db1.src (x int,y string) " +
"row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' " +
"with serdeproperties ('input.regex'='([\\\\d]+)\\u0001([\\\\S]+)')");
HiveTestUtils.createTextTableInserter(hiveShell, "db1", "src")
.addRow(new Object[]{1, "a"})
.addRow(new Object[]{2, "ab"})
.commit();
TableEnvironment tableEnv = getTableEnvWithHiveCatalog();
assertEquals("[1,a, 2,ab]", TableUtils.collectToList(tableEnv.sqlQuery("select * from db1.src order by x")).toString());
} finally {
hiveShell.execute("drop database db1 cascade");
}
}
private TableEnvironment getTableEnvWithHiveCatalog() {
TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode();
tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册