提交 7b2e6e70 编写于 作者: Z zjuwangg 提交者: bowen.li

[FLINK-13274]Refactor HiveTableSourceTest using HiveRunner

Refactor HiveTableSourceTest to use HiveRunner.

This closes #9130.
上级 869a8072
......@@ -388,10 +388,6 @@ under the License.
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-webhcat-java-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
......@@ -400,10 +396,6 @@ under the License.
<groupId>org.apache.hive</groupId>
<artifactId>hive-contrib</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
<artifactId>tez-dag</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.tez</groupId>
<artifactId>tez-common</artifactId>
......
......@@ -30,39 +30,40 @@ import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory;
import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.mapred.JobConf;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/**
* Tests {@link HiveTableSource}.
*/
@RunWith(FlinkStandaloneHiveRunner.class)
public class HiveTableSourceTest {
public static final String DEFAULT_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName();
public static final String DEFAULT_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName();
public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName();
@HiveSQL(files = {})
private static HiveShell hiveShell;
private static HiveCatalog hiveCatalog;
private static HiveConf hiveConf;
@BeforeClass
public static void createCatalog() throws IOException {
hiveConf = HiveTestUtils.createHiveConf();
hiveConf = hiveShell.getHiveConf();
hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf);
hiveCatalog.open();
}
......@@ -74,10 +75,24 @@ public class HiveTableSourceTest {
}
}
@Before
public void setupSourceDatabaseAndData() {
hiveShell.execute("CREATE DATABASE IF NOT EXISTS source_db");
}
@Test
public void testReadNonPartitionedTable() throws Exception {
final String dbName = "default";
final String dbName = "source_db";
final String tblName = "test";
hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)");
hiveShell.insertInto(dbName, tblName)
.withAllColumns()
.addRow(1, 1, "a", 1000L, 1.11)
.addRow(2, 2, "b", 2000L, 2.22)
.addRow(3, 3, "c", 3000L, 3.33)
.addRow(4, 4, "d", 4000L, 4.44)
.commit();
TableSchema tableSchema = new TableSchema(
new String[]{"a", "b", "c", "d", "e"},
new TypeInformation[]{
......@@ -87,29 +102,6 @@ public class HiveTableSourceTest {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO}
);
//Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set
//serDe temporarily.
HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null);
org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
tbl.setCreateTime((int) (System.currentTimeMillis() / 1000));
tbl.setParameters(new HashMap<>());
StorageDescriptor sd = new StorageDescriptor();
String location = HiveTableSourceTest.class.getResource("/test").getPath();
sd.setLocation(location);
sd.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS);
sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS);
sd.setSerdeInfo(new SerDeInfo());
sd.getSerdeInfo().setSerializationLib(DEFAULT_SERDE_CLASS);
sd.getSerdeInfo().setParameters(new HashMap<>());
sd.getSerdeInfo().getParameters().put("serialization.format", "1");
sd.getSerdeInfo().getParameters().put("field.delim", ",");
sd.setCols(HiveTableUtil.createHiveColumns(tableSchema));
tbl.setSd(sd);
tbl.setPartitionKeys(new ArrayList<>());
client.createTable(tbl);
ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
......@@ -125,4 +117,42 @@ public class HiveTableSourceTest {
Assert.assertEquals(3, rows.get(2).getField(0));
Assert.assertEquals(4, rows.get(3).getField(0));
}
/**
* Test to read from partition table.
* @throws Exception
*/
@Test
public void testReadPartitionTable() throws Exception {
final String dbName = "source_db";
final String tblName = "test_table_pt";
hiveShell.execute("CREATE TABLE source_db.test_table_pt " +
"(year STRING, value INT) partitioned by (pt int);");
hiveShell.insertInto("source_db", "test_table_pt")
.withColumns("year", "value", "pt")
.addRow("2014", 3, 0)
.addRow("2014", 4, 0)
.addRow("2015", 2, 1)
.addRow("2015", 5, 1)
.commit();
TableSchema tableSchema = new TableSchema(
new String[]{"year", "value", "int"},
new TypeInformation[]{
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO}
);
ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1);
BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv);
ObjectPath tablePath = new ObjectPath(dbName, tblName);
CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath);
HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable);
Table src = tEnv.fromTableSource(hiveTableSource);
DataSet<Row> rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames()));
List<Row> rows = rowDataSet.collect();
assertEquals(4, rows.size());
Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray();
assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册