提交 09aa841d 编写于 作者: M mbalassi 提交者: Stephan Ewen

[FLINK-1638] [streaming] Added persistent Kafka source

Exposed state registering in the public API
上级 e7485c23
......@@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaCustomOffsetSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource;
import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
import org.apache.flink.streaming.state.SimpleState;
public class KafkaConsumerExample {
......@@ -43,7 +42,8 @@ public class KafkaConsumerExample {
.addSource(
// new KafkaSource<String>(host + ":" + port, topic, new JavaDefaultStringSchema()))
// new SimpleKafkaSource<String>(topic, host, port, new JavaDefaultStringSchema()))
new KafkaCustomOffsetSource<String>(topic, host, port, new JavaDefaultStringSchema()))
new PersistentKafkaSource<String>(topic, host, port, 10L, new JavaDefaultStringSchema()))
.registerState("kafka", new SimpleState<Long>())
.setParallelism(3)
.print().setParallelism(3);
......
......@@ -42,7 +42,7 @@ public class KafkaProducerExample {
DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
@Override
public void run(Collector<String> collector) throws Exception {
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 20; i++) {
collector.collect("message #" + i);
Thread.sleep(100L);
}
......
......@@ -78,7 +78,7 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
}
public KafkaSource(String zookeeperHost, String topicId,
DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){
DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis){
this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME);
}
......
......@@ -26,7 +26,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
import kafka.admin.AdminUtils;
public class KafkaTopicCreator {
public class KafkaTopicFactory {
public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) {
createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000);
......
......@@ -18,9 +18,19 @@
package org.apache.flink.streaming.connectors.kafka.api.simple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.streaming.state.SimpleState;
import org.apache.flink.util.Collector;
public class KafkaCustomOffsetSource<OUT> extends SimpleKafkaSource<OUT> {
public class PersistentKafkaSource<OUT> extends SimpleKafkaSource<OUT> {
private static final long NUM_RECORDS_PER_CHECKPOINT = 1000;
private long initialOffset;
private transient SimpleState<Long> kafkaOffSet;
private transient long checkpointCounter;
/**
* Partition index is set automatically by instance id.
......@@ -30,17 +40,48 @@ public class KafkaCustomOffsetSource<OUT> extends SimpleKafkaSource<OUT> {
* @param port
* @param deserializationSchema
*/
public KafkaCustomOffsetSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) {
public PersistentKafkaSource(String topicId, String host, int port, long initialOffset, DeserializationSchema<OUT> deserializationSchema) {
super(topicId, host, port, deserializationSchema);
this.initialOffset = initialOffset;
}
@Override
public void open(Configuration parameters) {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
SimpleState<Long> lastKafkaOffSet = (SimpleState<Long>) context.getState("kafka");
if (lastKafkaOffSet.getState() == null){
kafkaOffSet = new SimpleState<Long>(initialOffset);
} else {
kafkaOffSet = lastKafkaOffSet;
}
checkpointCounter = 0;
super.open(parameters);
}
@Override
protected void setInitialOffset(Configuration config) {
iterator.initializeFromOffset(10);
iterator.initializeFromOffset(kafkaOffSet.getState());
}
@Override
protected void gotMessage(MessageWithOffset msg) {
System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage()));
}
@Override
public void run(Collector<OUT> collector) throws Exception {
MessageWithOffset msg;
while (iterator.hasNext()) {
msg = iterator.nextWithOffset();
gotMessage(msg);
OUT out = schema.deserialize(msg.getMessage());
collector.collect(out);
if (checkpointCounter > NUM_RECORDS_PER_CHECKPOINT){
kafkaOffSet = new SimpleState<Long>(msg.getOffset());
kafkaOffSet.checkpoint();
}
}
}
}
......@@ -38,8 +38,7 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
* @param port
* @param deserializationSchema
*/
public SimpleKafkaSource(String topicId,
String host, int port, DeserializationSchema<OUT> deserializationSchema) {
public SimpleKafkaSource(String topicId, String host, int port, DeserializationSchema<OUT> deserializationSchema) {
super(deserializationSchema);
this.topicId = topicId;
this.host = host;
......@@ -55,12 +54,13 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
iterator.initializeFromCurrent();
}
//This just for debug purposes
protected void gotMessage(MessageWithOffset msg) {
}
@SuppressWarnings("unchecked")
@Override
public void invoke(Collector<OUT> collector) throws Exception {
public void run(Collector<OUT> collector) throws Exception {
while (iterator.hasNext()) {
MessageWithOffset msg = iterator.nextWithOffset();
gotMessage(msg);
......@@ -69,6 +69,10 @@ public class SimpleKafkaSource<OUT> extends ConnectorSource<OUT> {
}
}
@Override
public void cancel() {
}
@Override
public void open(Configuration config) {
......
......@@ -245,8 +245,6 @@ public class StreamGraph extends StreamingPlan {
* Id of the iteration tail
* @param iterationID
* ID of iteration for mulitple iterations
* @param parallelism
* Number of parallel instances created
* @param waitTime
* Max waiting time for next record
*/
......@@ -297,8 +295,6 @@ public class StreamGraph extends StreamingPlan {
* Name of the vertex
* @param vertexClass
* The class of the vertex
* @param invokableObjectject
* The user defined invokable object
* @param operatorName
* Type of the user defined operator
* @param parallelism
......@@ -419,8 +415,8 @@ public class StreamGraph extends StreamingPlan {
return this.bufferTimeouts.get(vertexID);
}
public void addOperatorState(Integer veretxName, String stateName, OperatorState<?> state) {
Map<String, OperatorState<?>> states = operatorStates.get(veretxName);
public void addOperatorState(Integer vertexName, String stateName, OperatorState<?> state) {
Map<String, OperatorState<?>> states = operatorStates.get(vertexName);
if (states == null) {
states = new HashMap<String, OperatorState<?>>();
states.put(stateName, state);
......@@ -432,7 +428,7 @@ public class StreamGraph extends StreamingPlan {
states.put(stateName, state);
}
}
operatorStates.put(veretxName, states);
operatorStates.put(vertexName, states);
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册