diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml index f9a037b754c325d0a71f9a7d0d2849baf6e17884..02e15c928d32931e42e78e5b9eab2ad3135b3d71 100644 --- a/flink-connectors/flink-connector-hive/pom.xml +++ b/flink-connectors/flink-connector-hive/pom.xml @@ -388,10 +388,6 @@ under the License. org.apache.hive hive-jdbc - - org.apache.hive.hcatalog - hive-webhcat-java-client - org.apache.hive hive-service @@ -400,10 +396,6 @@ under the License. org.apache.hive hive-contrib - - org.apache.tez - tez-dag - org.apache.tez tez-common diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java index d6cf124b71c064115eeefe32dddd49bb652716e8..e127639527a9b4b31fae8a8be8a32017086aa889 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/batch/connectors/hive/HiveTableSourceTest.java @@ -30,39 +30,40 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientFactory; -import org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper; -import org.apache.flink.table.catalog.hive.util.HiveTableUtil; import org.apache.flink.types.Row; +import com.klarna.hiverunner.HiveShell; +import com.klarna.hiverunner.annotations.HiveSQL; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.mapred.JobConf; import org.junit.AfterClass; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + /** * Tests {@link HiveTableSource}. */ +@RunWith(FlinkStandaloneHiveRunner.class) public class HiveTableSourceTest { - public static final String DEFAULT_SERDE_CLASS = org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName(); - public static final String DEFAULT_INPUT_FORMAT_CLASS = org.apache.hadoop.mapred.TextInputFormat.class.getName(); - public static final String DEFAULT_OUTPUT_FORMAT_CLASS = org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName(); + + @HiveSQL(files = {}) + private static HiveShell hiveShell; private static HiveCatalog hiveCatalog; private static HiveConf hiveConf; @BeforeClass public static void createCatalog() throws IOException { - hiveConf = HiveTestUtils.createHiveConf(); + hiveConf = hiveShell.getHiveConf(); hiveCatalog = HiveTestUtils.createHiveCatalog(hiveConf); hiveCatalog.open(); } @@ -74,10 +75,24 @@ public class HiveTableSourceTest { } } + @Before + public void setupSourceDatabaseAndData() { + hiveShell.execute("CREATE DATABASE IF NOT EXISTS source_db"); + } + @Test public void testReadNonPartitionedTable() throws Exception { - final String dbName = "default"; + final String dbName = "source_db"; final String tblName = "test"; + hiveShell.execute("CREATE TABLE source_db.test ( a INT, b INT, c STRING, d BIGINT, e DOUBLE)"); + hiveShell.insertInto(dbName, tblName) + .withAllColumns() + .addRow(1, 1, "a", 1000L, 1.11) + .addRow(2, 2, "b", 2000L, 2.22) + .addRow(3, 3, "c", 3000L, 3.33) + .addRow(4, 4, "d", 4000L, 4.44) + .commit(); + TableSchema tableSchema = new TableSchema( new String[]{"a", "b", "c", "d", "e"}, new TypeInformation[]{ @@ -87,29 +102,6 @@ public class HiveTableSourceTest { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO} ); - //Now we used metaStore client to create hive table instead of using hiveCatalog for it doesn't support set - //serDe temporarily. - HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(hiveConf, null); - org.apache.hadoop.hive.metastore.api.Table tbl = new org.apache.hadoop.hive.metastore.api.Table(); - tbl.setDbName(dbName); - tbl.setTableName(tblName); - tbl.setCreateTime((int) (System.currentTimeMillis() / 1000)); - tbl.setParameters(new HashMap<>()); - StorageDescriptor sd = new StorageDescriptor(); - String location = HiveTableSourceTest.class.getResource("/test").getPath(); - sd.setLocation(location); - sd.setInputFormat(DEFAULT_INPUT_FORMAT_CLASS); - sd.setOutputFormat(DEFAULT_OUTPUT_FORMAT_CLASS); - sd.setSerdeInfo(new SerDeInfo()); - sd.getSerdeInfo().setSerializationLib(DEFAULT_SERDE_CLASS); - sd.getSerdeInfo().setParameters(new HashMap<>()); - sd.getSerdeInfo().getParameters().put("serialization.format", "1"); - sd.getSerdeInfo().getParameters().put("field.delim", ","); - sd.setCols(HiveTableUtil.createHiveColumns(tableSchema)); - tbl.setSd(sd); - tbl.setPartitionKeys(new ArrayList<>()); - - client.createTable(tbl); ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv); ObjectPath tablePath = new ObjectPath(dbName, tblName); @@ -125,4 +117,42 @@ public class HiveTableSourceTest { Assert.assertEquals(3, rows.get(2).getField(0)); Assert.assertEquals(4, rows.get(3).getField(0)); } + + /** + * Test to read from partition table. + * @throws Exception + */ + @Test + public void testReadPartitionTable() throws Exception { + final String dbName = "source_db"; + final String tblName = "test_table_pt"; + hiveShell.execute("CREATE TABLE source_db.test_table_pt " + + "(year STRING, value INT) partitioned by (pt int);"); + hiveShell.insertInto("source_db", "test_table_pt") + .withColumns("year", "value", "pt") + .addRow("2014", 3, 0) + .addRow("2014", 4, 0) + .addRow("2015", 2, 1) + .addRow("2015", 5, 1) + .commit(); + TableSchema tableSchema = new TableSchema( + new String[]{"year", "value", "int"}, + new TypeInformation[]{ + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO} + ); + ExecutionEnvironment execEnv = ExecutionEnvironment.createLocalEnvironment(1); + BatchTableEnvironment tEnv = BatchTableEnvironment.create(execEnv); + ObjectPath tablePath = new ObjectPath(dbName, tblName); + CatalogTable catalogTable = (CatalogTable) hiveCatalog.getTable(tablePath); + HiveTableSource hiveTableSource = new HiveTableSource(new JobConf(hiveConf), tablePath, catalogTable); + Table src = tEnv.fromTableSource(hiveTableSource); + DataSet rowDataSet = tEnv.toDataSet(src, new RowTypeInfo(tableSchema.getFieldTypes(), tableSchema.getFieldNames())); + List rows = rowDataSet.collect(); + assertEquals(4, rows.size()); + Object[] rowStrings = rows.stream().map(Row::toString).sorted().toArray(); + assertArrayEquals(new String[]{"2014,3,0", "2014,4,0", "2015,2,1", "2015,5,1"}, rowStrings); + } + }