From ce0c2901f9cb319e67993530ff2a8acc8d654880 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sat, 28 Feb 2015 21:02:07 +0100 Subject: [PATCH] [streaming] Several minor cleanups --- .../kafka/KafkaConsumerExample.java | 2 +- .../kafka/KafkaProducerExample.java | 2 +- .../connectors/kafka/api/KafkaSink.java | 12 +++--- .../connectors/kafka/api/KafkaSource.java | 40 ++++++++----------- .../kafka/config/EncoderWrapper.java | 2 +- .../kafka/config/KafkaConfigWrapper.java | 2 +- .../kafka/config/PartitionerWrapper.java | 1 + .../partitioner/KafkaConstantPartitioner.java | 1 + .../KafkaDistributePartitioner.java | 3 +- .../kafka/StringSerializerTest.java | 14 +++++-- .../flink/streaming/api/StreamGraph.java | 2 +- .../api/StreamingJobGraphGenerator.java | 2 +- .../streaming/api/datastream/DataStream.java | 2 +- .../StreamExecutionEnvironment.java | 2 - .../api/streamvertex/StreamVertexTest.java | 14 +++---- 15 files changed, 50 insertions(+), 51 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java index c06bf36147b..20c9bd7d803 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java @@ -58,4 +58,4 @@ public class KafkaConsumerExample { } } -} \ No newline at end of file +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java index c6fa39fc6bf..2c2bf80663c 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java @@ -38,7 +38,7 @@ public class KafkaProducerExample { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4); - @SuppressWarnings("unused") + @SuppressWarnings({ "unused", "serial" }) DataStream stream1 = env.addSource(new SourceFunction() { @Override public void invoke(Collector collector) throws Exception { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java index 1aca03ee98f..53244803555 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java @@ -19,6 +19,11 @@ package org.apache.flink.streaming.connectors.kafka.api; import java.util.Properties; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.serializer.DefaultEncoder; + import org.apache.flink.streaming.api.function.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.kafka.config.EncoderWrapper; import org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper; @@ -26,12 +31,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaDistributePa import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.connectors.util.SerializationSchema; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; -import kafka.serializer.DefaultEncoder; -import kafka.utils.VerifiableProperties; - /** * Sink that emits its inputs to a Kafka topic. * @@ -105,7 +104,6 @@ public class KafkaSink extends RichSinkFunction { partitionerWrapper.write(props); ProducerConfig config = new ProducerConfig(props); - VerifiableProperties props1 = config.props(); producer = new Producer(config); initDone = true; diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java index 38287356f6b..7a185bb8425 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java @@ -22,23 +22,23 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.ConnectorSource; -import org.apache.flink.streaming.connectors.util.DeserializationSchema; -import org.apache.flink.util.Collector; - import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.connectors.ConnectorSource; +import org.apache.flink.streaming.connectors.util.DeserializationSchema; +import org.apache.flink.util.Collector; + /** * Source that listens to a Kafka topic. - * + * * @param - * Type of the messages on the topic. + * Type of the messages on the topic. */ public class KafkaSource extends ConnectorSource { private static final long serialVersionUID = 1L; @@ -50,25 +50,20 @@ public class KafkaSource extends ConnectorSource { private transient ConsumerConnector consumer; private transient ConsumerIterator consumerIterator; - private int partitionIndex; - private int numberOfInstances; - private long zookeeperSyncTimeMillis; private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200; - private OUT outTuple; - /** * Creates a KafkaSource that consumes a topic. - * + * * @param zookeeperHost - * Address of the Zookeeper host (with port number). + * Address of the Zookeeper host (with port number). * @param topicId - * ID of the Kafka topic. + * ID of the Kafka topic. * @param deserializationSchema - * User defined deserialization schema. + * User defined deserialization schema. * @param zookeeperSyncTimeMillis - * Synchronization time with zookeeper. + * Synchronization time with zookeeper. */ public KafkaSource(String zookeeperHost, String topicId, DeserializationSchema deserializationSchema, long zookeeperSyncTimeMillis) { @@ -96,10 +91,9 @@ public class KafkaSource extends ConnectorSource { props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); - partitionIndex = getRuntimeContext().getIndexOfThisSubtask(); - numberOfInstances = getRuntimeContext().getNumberOfParallelSubtasks(); - Map>> consumerMap = consumer.createMessageStreams(Collections.singletonMap(topicId, 1)); + Map>> consumerMap = consumer + .createMessageStreams(Collections.singletonMap(topicId, 1)); List> streams = consumerMap.get(topicId); KafkaStream stream = streams.get(0); @@ -108,9 +102,9 @@ public class KafkaSource extends ConnectorSource { /** * Called to forward the data from the source to the {@link DataStream}. - * + * * @param collector - * The Collector for sending data to the dataStream + * The Collector for sending data to the dataStream */ @Override public void invoke(Collector collector) throws Exception { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EncoderWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EncoderWrapper.java index 288c2c14683..52ea319b8df 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EncoderWrapper.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/EncoderWrapper.java @@ -43,4 +43,4 @@ public class EncoderWrapper extends KafkaConfigWrapper { properties.put(getClass().getCanonicalName(), stringSerializer.serialize(wrapped)); } -} \ No newline at end of file +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/PartitionerWrapper.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/PartitionerWrapper.java index 4c84ce1c9b6..bfeed1005b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/PartitionerWrapper.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/config/PartitionerWrapper.java @@ -38,6 +38,7 @@ public class PartitionerWrapper extends KafkaConfigWrapper implements KafkaPartitioner { + private static final long serialVersionUID = 1L; private int partition; public KafkaConstantPartitioner(int partition) { diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaDistributePartitioner.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaDistributePartitioner.java index 6f92e4e9124..480aa17309f 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaDistributePartitioner.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaDistributePartitioner.java @@ -26,6 +26,7 @@ package org.apache.flink.streaming.connectors.kafka.partitioner; */ public class KafkaDistributePartitioner implements KafkaPartitioner { + private static final long serialVersionUID = 1L; int currentPartition; public KafkaDistributePartitioner() { @@ -37,4 +38,4 @@ public class KafkaDistributePartitioner implements KafkaPartitioner { return currentPartition++ % numberOfPartitions; } -} \ No newline at end of file +} diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/StringSerializerTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/StringSerializerTest.java index b0b31cb89c5..0112927bac3 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/StringSerializerTest.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/StringSerializerTest.java @@ -27,6 +27,8 @@ import org.junit.Test; public class StringSerializerTest { private static class MyClass implements Serializable { + + private static final long serialVersionUID = 1L; private int a; private String b; @@ -37,11 +39,15 @@ public class StringSerializerTest { @Override public boolean equals(Object o) { - try { - MyClass other = (MyClass) o; - return a == other.a && b.equals(other.b); - } catch (ClassCastException e) { + if (o == null) { return false; + } else { + try { + MyClass other = (MyClass) o; + return a == other.a && b.equals(other.b); + } catch (ClassCastException e) { + return false; + } } } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 0f5ea54bb45..f6a63a780e0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -772,4 +772,4 @@ public class StreamGraph extends StreamingPlan { } -} \ No newline at end of file +} diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index 8ec848675bb..b999c27e0b9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -239,7 +239,7 @@ public class StreamingJobGraphGenerator { AbstractJobVertex downStreamVertex = streamVertices.get(downStreamvertexID); StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); - StreamConfig upStreamConfig = headOfChain == upStreamvertexID ? new StreamConfig( + StreamConfig upStreamConfig = headOfChain.equals(upStreamvertexID) ? new StreamConfig( headVertex.getConfiguration()) : chainedConfigs.get(headOfChain).get( upStreamvertexID); diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 46b71a5736b..f7b5f0737f6 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -1262,7 +1262,7 @@ public class DataStream { private void validateMerge(Integer id) { for (DataStream ds : this.mergedStreams) { - if (ds.getId() == id) { + if (ds.getId().equals(id)) { throw new RuntimeException("A DataStream cannot be merged with itself"); } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index b43ae374afb..835ce4e08b7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -73,8 +73,6 @@ public abstract class StreamExecutionEnvironment { protected StreamGraph streamGraph; - private static StreamExecutionEnvironmentFactory executionEnvironmentFactory; - // -------------------------------------------------------------------------------------------- // Constructor and Properties // -------------------------------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java index b103d84897d..4f01a8b4821 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamvertex/StreamVertexTest.java @@ -21,9 +21,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.java.tuple.Tuple1; @@ -87,7 +89,6 @@ public class StreamVertexTest { LocalStreamEnvironment env = StreamExecutionEnvironment .createLocalEnvironment(SOURCE_PARALELISM); - try { env.fromCollection(null); fail(); @@ -133,14 +134,13 @@ public class StreamVertexTest { } } - static HashSet resultSet; - private static class SetSink implements SinkFunction { private static final long serialVersionUID = 1L; + public static Set result = Collections.synchronizedSet(new HashSet()); @Override public void invoke(String value) { - resultSet.add(value); + result.add(value); } } @@ -153,19 +153,19 @@ public class StreamVertexTest { fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink()); - resultSet = new HashSet(); env.execute(); HashSet expectedSet = new HashSet(Arrays.asList("aa", "bb", "cc", "0", "1", "2", "3")); - assertEquals(expectedSet, resultSet); + assertEquals(expectedSet, SetSink.result); } @Test public void runStream() throws Exception { StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE); - env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink()); + env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()) + .addSink(new MySink()); env.execute(); assertEquals(10, data.keySet().size()); -- GitLab