diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index 754b2b35dd522083e9de8c71ed3eb2fec54ea9ec..d9b03c9dfae0cabddcce1f0e2f624c7c0d1e30be 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -19,10 +19,9 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.kafka.api.KafkaSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaCustomOffsetSource; -import org.apache.flink.streaming.connectors.kafka.api.simple.SimpleKafkaSource; +import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource; import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema; +import org.apache.flink.streaming.state.SimpleState; public class KafkaConsumerExample { @@ -43,7 +42,8 @@ public class KafkaConsumerExample { .addSource( // new KafkaSource(host + ":" + port, topic, new JavaDefaultStringSchema())) // new SimpleKafkaSource(topic, host, port, new JavaDefaultStringSchema())) - new KafkaCustomOffsetSource(topic, host, port, new JavaDefaultStringSchema())) + new PersistentKafkaSource(topic, host, port, 10L, new JavaDefaultStringSchema())) + .registerState("kafka", new SimpleState()) .setParallelism(3) .print().setParallelism(3); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java index 1cd1192209da440651f690560eae26499273b19f..a17beb8274aebf0113e941739ea42973190f1f0b 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java @@ -42,7 +42,7 @@ public class KafkaProducerExample { DataStream stream1 = env.addSource(new SourceFunction() { @Override public void run(Collector collector) throws Exception { - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 20; i++) { collector.collect("message #" + i); Thread.sleep(100L); } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 3075608685b100994c6fe543dca436981059ce4e..43490818742bfcd8cd1d2119791f45088cfc3fc7 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -78,7 +78,7 @@ public class KafkaSource extends ConnectorSource { } public KafkaSource(String zookeeperHost, String topicId, - DeserializationSchema deserializationSchema, long zookeeperSyncTimeMillis){ + DeserializationSchema deserializationSchema, long zookeeperSyncTimeMillis){ this(zookeeperHost, topicId, DEFAULT_GROUP_ID, deserializationSchema, ZOOKEEPER_DEFAULT_SYNC_TIME); } diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java deleted file mode 100644 index d90ff7c3270d80441944598a79a9805eda30bb4f..0000000000000000000000000000000000000000 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaCustomOffsetSource.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.api.simple; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.connectors.util.DeserializationSchema; - -public class KafkaCustomOffsetSource extends SimpleKafkaSource { - - /** - * Partition index is set automatically by instance id. - * - * @param topicId - * @param host - * @param port - * @param deserializationSchema - */ - public KafkaCustomOffsetSource(String topicId, String host, int port, DeserializationSchema deserializationSchema) { - super(topicId, host, port, deserializationSchema); - } - - @Override - protected void setInitialOffset(Configuration config) { - iterator.initializeFromOffset(10); - } - - @Override - protected void gotMessage(MessageWithOffset msg) { - System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage())); - } -} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java similarity index 98% rename from flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java rename to flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java index 9e12492a9a96859fd27c0759b8e049e7e10bae4a..f949b9aad716dbe69e9646427f9580236709223a 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicCreator.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicFactory.java @@ -26,7 +26,7 @@ import org.I0Itec.zkclient.serialize.ZkSerializer; import kafka.admin.AdminUtils; -public class KafkaTopicCreator { +public class KafkaTopicFactory { public static void createTopic(String zookeeperServer, String topicName, int numOfPartitions, int replicationFactor) { createTopic(zookeeperServer, topicName, numOfPartitions, replicationFactor, new Properties(), 10000, 10000); diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java new file mode 100644 index 0000000000000000000000000000000000000000..00d003a2c8cb7b50db447fce4e1b43b92869f26c --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka.api.simple; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; +import org.apache.flink.streaming.state.SimpleState; +import org.apache.flink.util.Collector; + +public class PersistentKafkaSource extends SimpleKafkaSource { + + private static final long NUM_RECORDS_PER_CHECKPOINT = 1000; + + private long initialOffset; + + private transient SimpleState kafkaOffSet; + private transient long checkpointCounter; + + /** + * Partition index is set automatically by instance id. + * + * @param topicId + * @param host + * @param port + * @param deserializationSchema + */ + public PersistentKafkaSource(String topicId, String host, int port, long initialOffset, DeserializationSchema deserializationSchema) { + super(topicId, host, port, deserializationSchema); + this.initialOffset = initialOffset; + } + + @Override + public void open(Configuration parameters) { + StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); + SimpleState lastKafkaOffSet = (SimpleState) context.getState("kafka"); + + if (lastKafkaOffSet.getState() == null){ + kafkaOffSet = new SimpleState(initialOffset); + } else { + kafkaOffSet = lastKafkaOffSet; + } + + checkpointCounter = 0; + super.open(parameters); + } + + @Override + protected void setInitialOffset(Configuration config) { + iterator.initializeFromOffset(kafkaOffSet.getState()); + } + + @Override + protected void gotMessage(MessageWithOffset msg) { + System.out.println(msg.getOffset() + " :: " + schema.deserialize(msg.getMessage())); + } + + @Override + public void run(Collector collector) throws Exception { + MessageWithOffset msg; + while (iterator.hasNext()) { + msg = iterator.nextWithOffset(); + gotMessage(msg); + OUT out = schema.deserialize(msg.getMessage()); + collector.collect(out); + if (checkpointCounter > NUM_RECORDS_PER_CHECKPOINT){ + kafkaOffSet = new SimpleState(msg.getOffset()); + kafkaOffSet.checkpoint(); + } + } + } +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java index a721dee351351250db820ca7a4ec5dcb680e181c..473585c59430b371d2724cdf81daaa91125295db 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/SimpleKafkaSource.java @@ -38,8 +38,7 @@ public class SimpleKafkaSource extends ConnectorSource { * @param port * @param deserializationSchema */ - public SimpleKafkaSource(String topicId, - String host, int port, DeserializationSchema deserializationSchema) { + public SimpleKafkaSource(String topicId, String host, int port, DeserializationSchema deserializationSchema) { super(deserializationSchema); this.topicId = topicId; this.host = host; @@ -55,12 +54,13 @@ public class SimpleKafkaSource extends ConnectorSource { iterator.initializeFromCurrent(); } + //This just for debug purposes protected void gotMessage(MessageWithOffset msg) { } @SuppressWarnings("unchecked") @Override - public void invoke(Collector collector) throws Exception { + public void run(Collector collector) throws Exception { while (iterator.hasNext()) { MessageWithOffset msg = iterator.nextWithOffset(); gotMessage(msg); @@ -69,6 +69,10 @@ public class SimpleKafkaSource extends ConnectorSource { } } + @Override + public void cancel() { + } + @Override public void open(Configuration config) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 640416d62e77eaba365fd0668b2e0d31db96d943..641708e8f83b9a9b22bce8dc13c476afb2e6258b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -245,8 +245,6 @@ public class StreamGraph extends StreamingPlan { * Id of the iteration tail * @param iterationID * ID of iteration for mulitple iterations - * @param parallelism - * Number of parallel instances created * @param waitTime * Max waiting time for next record */ @@ -297,8 +295,6 @@ public class StreamGraph extends StreamingPlan { * Name of the vertex * @param vertexClass * The class of the vertex - * @param invokableObjectject - * The user defined invokable object * @param operatorName * Type of the user defined operator * @param parallelism @@ -419,8 +415,8 @@ public class StreamGraph extends StreamingPlan { return this.bufferTimeouts.get(vertexID); } - public void addOperatorState(Integer veretxName, String stateName, OperatorState state) { - Map> states = operatorStates.get(veretxName); + public void addOperatorState(Integer vertexName, String stateName, OperatorState state) { + Map> states = operatorStates.get(vertexName); if (states == null) { states = new HashMap>(); states.put(stateName, state); @@ -432,7 +428,7 @@ public class StreamGraph extends StreamingPlan { states.put(stateName, state); } } - operatorStates.put(veretxName, states); + operatorStates.put(vertexName, states); } /**