提交 479be148 编写于 作者: U Ufuk Celebi

[FLINK-3872] [table, connector-kafka] Add KafkaTableSource

This closes #2069.
上级 3ac080f0
......@@ -207,6 +207,49 @@ A `TableSource` can provide access to data stored in various storage systems suc
Currently, Flink only provides a `CsvTableSource` to read CSV files. A custom `TableSource` can be defined by implementing the `BatchTableSource` or `StreamTableSource` interface.
### Available Table Sources
| **Class name** | **Maven dependency** | **Batch?** | **Streaming?** | **Description**
| `CsvTableSouce` | `flink-table` | Y | Y | A simple source for CSV files with up to 25 fields.
| `Kafka08JsonTableSource` | `flink-connector-kafka-0.8` | N | Y | A Kafka 0.8 source for JSON data.
| `Kafka09JsonTableSource` | `flink-connector-kafka-0.9` | N | Y | A Kafka 0.9 source for JSON data.
All source that come with the `flink-table` dependency can be directly used by your Table programs. For all other table sources, you have to add the respective dependency in addition to the `flink-table` dependency.
#### KafkaJsonTableSource
To use the Kafka JSON source, you have to add the Kafka connector dependency to your project:
- `flink-connector-kafka-0.8` for Kafka 0.8, and
- `flink-connector-kafka-0.9` for Kafka 0.9, respectively.
You can then create the source as follows (example for Kafka 0.8):
```java
// The JSON field names and types
String[] fieldNames = new String[] { "id", "name", "score"};
Class<?>[] fieldTypes = new Class<?>[] { Integer.class, String.class, Double.class };
KafkaJsonTableSource kafkaTableSource = new Kafka08JsonTableSource(
kafkaTopic,
kafkaProperties,
fieldNames,
fieldTypes);
```
By default, a missing JSON field does not fail the source. You can configure this via:
```java
// Fail on missing JSON field
tableSource.setFailOnMissingField(true);
```
You can work with the Table as explained in the rest of the Table API guide:
```java
tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
Table result = tableEnvironment.ingest("kafka-source");
```
Table API
----------
......
......@@ -55,7 +55,6 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
......@@ -63,6 +62,16 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project,
won't depend on flink-table. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
public class Kafka08JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.8 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka08JsonTableSource(
String topic,
Properties properties,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
super(topic, properties, fieldNames, fieldTypes);
}
/**
* Creates a Kafka 0.8 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka08JsonTableSource(
String topic,
Properties properties,
String[] fieldNames,
Class<?>[] fieldTypes) {
super(topic, properties, fieldNames, fieldTypes);
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*/
public class Kafka08TableSource extends KafkaTableSource {
/**
* Creates a Kafka 0.8 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka08TableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
}
/**
* Creates a Kafka 0.8 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka08TableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
Class<?>[] fieldTypes) {
super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
}
}
......@@ -18,8 +18,9 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -31,6 +32,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
......@@ -360,4 +362,66 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
curatorFramework.close();
}
@Test
public void testJsonTableSource() throws Exception {
String topic = UUID.randomUUID().toString();
// Names and types are determined in the actual test method of the
// base test class.
Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
topic,
standardProps,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });
// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(false);
runJsonTableSource(topic, tableSource);
}
@Test
public void testJsonTableSourceWithFailOnMissingField() throws Exception {
String topic = UUID.randomUUID().toString();
// Names and types are determined in the actual test method of the
// base test class.
Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
topic,
standardProps,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });
// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(true);
try {
runJsonTableSource(topic, tableSource);
fail("Did not throw expected Exception");
} catch (Exception e) {
Throwable rootCause = e.getCause().getCause().getCause();
assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
}
}
}
......@@ -70,7 +70,6 @@ under the License.
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
......@@ -99,6 +98,15 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<!-- Projects depending on this project,
won't depend on flink-table. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.9.
*/
public class Kafka09JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka09JsonTableSource(
String topic,
Properties properties,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
super(topic, properties, fieldNames, fieldTypes);
}
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka09JsonTableSource(
String topic,
Properties properties,
String[] fieldNames,
Class<?>[] fieldTypes) {
super(topic, properties, fieldNames, fieldTypes);
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.9.
*/
public class Kafka09TableSource extends KafkaTableSource {
/**
* Creates a Kafka 0.9 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka09TableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
}
/**
* Creates a Kafka 0.9 {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
public Kafka09TableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
Class<?>[] fieldTypes) {
super(topic, properties, deserializationSchema, fieldNames, fieldTypes);
}
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
}
......@@ -17,8 +17,15 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.junit.Test;
import java.util.UUID;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class Kafka09ITCase extends KafkaConsumerTestBase {
......@@ -115,4 +122,66 @@ public class Kafka09ITCase extends KafkaConsumerTestBase {
runMetricsAndEndOfStreamTest();
}
@Test
public void testJsonTableSource() throws Exception {
String topic = UUID.randomUUID().toString();
// Names and types are determined in the actual test method of the
// base test class.
Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
topic,
standardProps,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });
// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(false);
runJsonTableSource(topic, tableSource);
}
@Test
public void testJsonTableSourceWithFailOnMissingField() throws Exception {
String topic = UUID.randomUUID().toString();
// Names and types are determined in the actual test method of the
// base test class.
Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
topic,
standardProps,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });
// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(true);
try {
runJsonTableSource(topic, tableSource);
fail("Did not throw expected Exception");
} catch (Exception e) {
Throwable rootCause = e.getCause().getCause().getCause();
assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import java.util.Properties;
/**
* A version-agnostic Kafka JSON {@link StreamTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
*
* <p>The field names are used to parse the JSON file and so are the types.
*/
public abstract class KafkaJsonTableSource extends KafkaTableSource {
/**
* Creates a generic Kafka JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
KafkaJsonTableSource(
String topic,
Properties properties,
String[] fieldNames,
Class<?>[] fieldTypes) {
super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
}
/**
* Creates a generic Kafka JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
KafkaJsonTableSource(
String topic,
Properties properties,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
super(topic, properties, createDeserializationSchema(fieldNames, fieldTypes), fieldNames, fieldTypes);
}
/**
* Configures the failure behaviour if a JSON field is missing.
*
* <p>By default, a missing field is ignored and the field is set to null.
*
* @param failOnMissingField Flag indicating whether to fail or not on a missing field.
*/
public void setFailOnMissingField(boolean failOnMissingField) {
JsonRowDeserializationSchema deserializationSchema = (JsonRowDeserializationSchema) getDeserializationSchema();
deserializationSchema.setFailOnMissingField(failOnMissingField);
}
private static JsonRowDeserializationSchema createDeserializationSchema(
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
}
private static JsonRowDeserializationSchema createDeserializationSchema(
String[] fieldNames,
Class<?>[] fieldTypes) {
return new JsonRowDeserializationSchema(fieldNames, fieldTypes);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.sources.StreamTableSource;
import org.apache.flink.api.table.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.Preconditions;
import java.util.Properties;
/**
* A version-agnostic Kafka {@link StreamTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
abstract class KafkaTableSource implements StreamTableSource<Row> {
/** The Kafka topic to consume. */
private final String topic;
/** Properties for the Kafka consumer. */
private final Properties properties;
/** Deserialization schema to use for Kafka records. */
private final DeserializationSchema<Row> deserializationSchema;
/** Row field names. */
private final String[] fieldNames;
/** Row field types. */
private final TypeInformation<?>[] fieldTypes;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
KafkaTableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
Class<?>[] fieldTypes) {
this(topic, properties, deserializationSchema, fieldNames, toTypeInfo(fieldTypes));
}
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @param fieldNames Row field names.
* @param fieldTypes Row field types.
*/
KafkaTableSource(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema,
String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
this.topic = Preconditions.checkNotNull(topic, "Topic");
this.properties = Preconditions.checkNotNull(properties, "Properties");
this.deserializationSchema = Preconditions.checkNotNull(deserializationSchema, "Deserialization schema");
this.fieldNames = Preconditions.checkNotNull(fieldNames, "Field names");
this.fieldTypes = Preconditions.checkNotNull(fieldTypes, "Field types");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
}
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
// Version-specific Kafka consumer
FlinkKafkaConsumerBase<Row> kafkaConsumer = getKafkaConsumer(topic, properties, deserializationSchema);
DataStream<Row> kafkaSource = env.addSource(kafkaConsumer);
return kafkaSource;
}
@Override
public int getNumberOfFields() {
return fieldNames.length;
}
@Override
public String[] getFieldsNames() {
return fieldNames;
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return fieldTypes;
}
@Override
public TypeInformation<Row> getReturnType() {
return new RowTypeInfo(fieldTypes, fieldNames);
}
/**
* Returns the version-specific Kafka consumer.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<Row> deserializationSchema);
/**
* Returns the deserialization schema.
*
* @return The deserialization schema
*/
protected DeserializationSchema<Row> getDeserializationSchema() {
return deserializationSchema;
}
/**
* Creates TypeInformation array for an array of Classes.
*/
private static TypeInformation<?>[] toTypeInfo(Class<?>[] fieldTypes) {
TypeInformation<?>[] typeInfos = new TypeInformation[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
typeInfos[i] = TypeExtractor.getForClass(fieldTypes[i]);
}
return typeInfos;
}
}
......@@ -18,6 +18,8 @@
package org.apache.flink.streaming.connectors.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
......@@ -25,7 +27,6 @@ import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.server.KafkaServer;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobExecutionResult;
......@@ -38,9 +39,12 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.table.StreamTableEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.api.table.Row;
import org.apache.flink.api.table.Table;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
......@@ -74,23 +78,21 @@ import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2Partitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.testutils.junit.RetryOnException;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
......@@ -107,6 +109,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.test.util.TestUtils.tryExecute;
......@@ -736,6 +739,121 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBase {
}
}
/**
* Runs a table source test with JSON data.
*
* The table source needs to parse the following JSON fields:
* - "long" -> number
* - "string" -> "string"
* - "boolean" -> true|false
* - "double" -> fraction
*/
public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
final int numElements = 1024;
final long[] longs = new long[numElements];
final String[] strings = new String[numElements];
final boolean[] booleans = new boolean[numElements];
final double[] doubles = new double[numElements];
final byte[][] serializedJson = new byte[numElements][];
ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < numElements; i++) {
longs[i] = random.nextLong();
strings[i] = Integer.toHexString(random.nextInt());
booleans[i] = random.nextBoolean();
doubles[i] = random.nextDouble();
ObjectNode entry = mapper.createObjectNode();
entry.put("long", longs[i]);
entry.put("string", strings[i]);
entry.put("boolean", booleans[i]);
entry.put("double", doubles[i]);
serializedJson[i] = mapper.writeValueAsBytes(entry);
}
// Produce serialized JSON data
createTestTopic(topic, 1, 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", flinkPort);
env.getConfig().disableSysoutLogging();
env.addSource(new SourceFunction<byte[]>() {
@Override
public void run(SourceContext<byte[]> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
ctx.collect(serializedJson[i]);
}
}
@Override
public void cancel() {
}
}).addSink(kafkaServer.getProducer(
topic,
new ByteArraySerializationSchema(),
standardProps,
null));
// Execute blocks
env.execute();
// Register as table source
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
tableEnvironment.registerTableSource("kafka", kafkaTableSource);
Table result = tableEnvironment.ingest("kafka");
tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {
int i = 0;
@Override
public void invoke(Row value) throws Exception {
assertEquals(5, value.productArity());
assertEquals(longs[i], value.productElement(0));
assertEquals(strings[i], value.productElement(1));
assertEquals(booleans[i], value.productElement(2));
assertEquals(doubles[i], value.productElement(3));
assertNull(value.productElement(4));
if (i == numElements-1) {
throw new SuccessException();
} else {
i++;
}
}
});
tryExecutePropagateExceptions(env, "KafkaTableSource");
}
/**
* Serialization scheme forwarding byte[] records.
*/
private static class ByteArraySerializationSchema implements KeyedSerializationSchema<byte[]> {
@Override
public byte[] serializeKey(byte[] element) {
return null;
}
@Override
public byte[] serializeValue(byte[] element) {
return element;
}
@Override
public String getTargetTopic(byte[] element) {
return null;
}
}
private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册