提交 ce0c2901 编写于 作者: G Gyula Fora

[streaming] Several minor cleanups

上级 b7b547d8
......@@ -38,7 +38,7 @@ public class KafkaProducerExample {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment().setDegreeOfParallelism(4);
@SuppressWarnings("unused")
@SuppressWarnings({ "unused", "serial" })
DataStream<String> stream1 = env.addSource(new SourceFunction<String>() {
@Override
public void invoke(Collector<String> collector) throws Exception {
......
......@@ -19,6 +19,11 @@ package org.apache.flink.streaming.connectors.kafka.api;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.config.EncoderWrapper;
import org.apache.flink.streaming.connectors.kafka.config.PartitionerWrapper;
......@@ -26,12 +31,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaDistributePa
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.util.SerializationSchema;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.DefaultEncoder;
import kafka.utils.VerifiableProperties;
/**
* Sink that emits its inputs to a Kafka topic.
*
......@@ -105,7 +104,6 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
partitionerWrapper.write(props);
ProducerConfig config = new ProducerConfig(props);
VerifiableProperties props1 = config.props();
producer = new Producer<IN, byte[]>(config);
initDone = true;
......
......@@ -22,23 +22,23 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.ConnectorSource;
import org.apache.flink.streaming.connectors.util.DeserializationSchema;
import org.apache.flink.util.Collector;
/**
* Source that listens to a Kafka topic.
*
*
* @param <OUT>
* Type of the messages on the topic.
* Type of the messages on the topic.
*/
public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private static final long serialVersionUID = 1L;
......@@ -50,25 +50,20 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
private transient ConsumerConnector consumer;
private transient ConsumerIterator<byte[], byte[]> consumerIterator;
private int partitionIndex;
private int numberOfInstances;
private long zookeeperSyncTimeMillis;
private static final long ZOOKEEPER_DEFAULT_SYNC_TIME = 200;
private OUT outTuple;
/**
* Creates a KafkaSource that consumes a topic.
*
*
* @param zookeeperHost
* Address of the Zookeeper host (with port number).
* Address of the Zookeeper host (with port number).
* @param topicId
* ID of the Kafka topic.
* ID of the Kafka topic.
* @param deserializationSchema
* User defined deserialization schema.
* User defined deserialization schema.
* @param zookeeperSyncTimeMillis
* Synchronization time with zookeeper.
* Synchronization time with zookeeper.
*/
public KafkaSource(String zookeeperHost, String topicId,
DeserializationSchema<OUT> deserializationSchema, long zookeeperSyncTimeMillis) {
......@@ -96,10 +91,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
partitionIndex = getRuntimeContext().getIndexOfThisSubtask();
numberOfInstances = getRuntimeContext().getNumberOfParallelSubtasks();
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(Collections.singletonMap(topicId, 1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(Collections.singletonMap(topicId, 1));
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicId);
KafkaStream<byte[], byte[]> stream = streams.get(0);
......@@ -108,9 +102,9 @@ public class KafkaSource<OUT> extends ConnectorSource<OUT> {
/**
* Called to forward the data from the source to the {@link DataStream}.
*
*
* @param collector
* The Collector for sending data to the dataStream
* The Collector for sending data to the dataStream
*/
@Override
public void invoke(Collector<OUT> collector) throws Exception {
......
......@@ -43,4 +43,4 @@ public class EncoderWrapper<T> extends KafkaConfigWrapper<SerializationSchema<T,
return wrapped.serialize(element);
}
}
\ No newline at end of file
}
......@@ -57,4 +57,4 @@ public abstract class KafkaConfigWrapper<T extends Serializable> {
properties.put(getClass().getCanonicalName(), stringSerializer.serialize(wrapped));
}
}
\ No newline at end of file
}
......@@ -38,6 +38,7 @@ public class PartitionerWrapper<T> extends KafkaConfigWrapper<KafkaPartitioner<T
super(properties);
}
@SuppressWarnings("unchecked")
@Override
public int partition(Object key, int numPartitions) {
return wrapped.partition((T) key, numPartitions);
......
......@@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
public class KafkaConstantPartitioner<T> implements KafkaPartitioner<T> {
private static final long serialVersionUID = 1L;
private int partition;
public KafkaConstantPartitioner(int partition) {
......
......@@ -26,6 +26,7 @@ package org.apache.flink.streaming.connectors.kafka.partitioner;
*/
public class KafkaDistributePartitioner<T> implements KafkaPartitioner<T> {
private static final long serialVersionUID = 1L;
int currentPartition;
public KafkaDistributePartitioner() {
......@@ -37,4 +38,4 @@ public class KafkaDistributePartitioner<T> implements KafkaPartitioner<T> {
return currentPartition++ % numberOfPartitions;
}
}
\ No newline at end of file
}
......@@ -27,6 +27,8 @@ import org.junit.Test;
public class StringSerializerTest {
private static class MyClass implements Serializable {
private static final long serialVersionUID = 1L;
private int a;
private String b;
......@@ -37,11 +39,15 @@ public class StringSerializerTest {
@Override
public boolean equals(Object o) {
try {
MyClass other = (MyClass) o;
return a == other.a && b.equals(other.b);
} catch (ClassCastException e) {
if (o == null) {
return false;
} else {
try {
MyClass other = (MyClass) o;
return a == other.a && b.equals(other.b);
} catch (ClassCastException e) {
return false;
}
}
}
}
......
......@@ -772,4 +772,4 @@ public class StreamGraph extends StreamingPlan {
}
}
\ No newline at end of file
}
......@@ -239,7 +239,7 @@ public class StreamingJobGraphGenerator {
AbstractJobVertex downStreamVertex = streamVertices.get(downStreamvertexID);
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
StreamConfig upStreamConfig = headOfChain == upStreamvertexID ? new StreamConfig(
StreamConfig upStreamConfig = headOfChain.equals(upStreamvertexID) ? new StreamConfig(
headVertex.getConfiguration()) : chainedConfigs.get(headOfChain).get(
upStreamvertexID);
......
......@@ -1262,7 +1262,7 @@ public class DataStream<OUT> {
private void validateMerge(Integer id) {
for (DataStream<OUT> ds : this.mergedStreams) {
if (ds.getId() == id) {
if (ds.getId().equals(id)) {
throw new RuntimeException("A DataStream cannot be merged with itself");
}
}
......
......@@ -73,8 +73,6 @@ public abstract class StreamExecutionEnvironment {
protected StreamGraph streamGraph;
private static StreamExecutionEnvironmentFactory executionEnvironmentFactory;
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
......
......@@ -21,9 +21,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple1;
......@@ -87,7 +89,6 @@ public class StreamVertexTest {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
try {
env.fromCollection(null);
fail();
......@@ -133,14 +134,13 @@ public class StreamVertexTest {
}
}
static HashSet<String> resultSet;
private static class SetSink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public static Set<String> result = Collections.synchronizedSet(new HashSet<String>());
@Override
public void invoke(String value) {
resultSet.add(value);
result.add(value);
}
}
......@@ -153,19 +153,19 @@ public class StreamVertexTest {
fromStringElements.connect(generatedSequence).map(new CoMap()).addSink(new SetSink());
resultSet = new HashSet<String>();
env.execute();
HashSet<String> expectedSet = new HashSet<String>(Arrays.asList("aa", "bb", "cc", "0", "1",
"2", "3"));
assertEquals(expectedSet, resultSet);
assertEquals(expectedSet, SetSink.result);
}
@Test
public void runStream() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(SOURCE_PARALELISM, MEMORYSIZE);
env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask()).addSink(new MySink());
env.addSource(new MySource()).setParallelism(SOURCE_PARALELISM).map(new MyTask())
.addSink(new MySink());
env.execute();
assertEquals(10, data.keySet().size());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册