提交 7238010e 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Fixed generics and added SuppressWarnings

上级 9d5bd601
......@@ -197,7 +197,6 @@ public class DataStream<T extends Tuple> {
* The DataStreams to connect output with.
* @return The connected DataStream.
*/
@SuppressWarnings("unchecked")
public DataStream<T> connectWith(DataStream<T>... streams) {
DataStream<T> returnStream = copy();
......
......@@ -184,7 +184,7 @@ public abstract class StreamExecutionEnvironment {
* type of the returned stream
* @return The DataStream representing the elements.
*/
public <X> DataStream<Tuple1<X>> fromElements(@SuppressWarnings("unchecked") X... data) {
public <X> DataStream<Tuple1<X>> fromElements(X... data) {
DataStream<Tuple1<X>> returnStream = new DataStream<Tuple1<X>>(this, "elements");
jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
......
......@@ -31,7 +31,6 @@ public class FromElementsFunction<T> extends SourceFunction<Tuple1<T>> {
Iterable<T> iterable;
Tuple1<T> outTuple = new Tuple1<T>();
@SuppressWarnings("unchecked")
public FromElementsFunction(T... elements) {
this.iterable = Arrays.asList(elements);
}
......
......@@ -178,7 +178,7 @@ public abstract class AbstractStreamComponent<IN extends Tuple, OUT extends Tupl
if (numberOfInputs < 2) {
return new StreamRecordReader<IN>(this, (Class<? extends StreamRecord<IN>>) StreamRecord.class, inDeserializationDelegate,
return new StreamRecordReader<IN>(this, StreamRecord.class, inDeserializationDelegate,
inTupleSerializer);
} else {
......
......@@ -22,7 +22,6 @@ package org.apache.flink.streaming.api.streamcomponent;
import java.io.IOException;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
......@@ -39,7 +38,8 @@ import org.apache.flink.runtime.io.network.gates.InputChannelResult;
public class StreamRecordReader<T extends Tuple> extends AbstractSingleGateRecordReader<StreamRecord<T>> implements
Reader<StreamRecord<T>> {
private final Class<? extends StreamRecord<T>> recordType;
@SuppressWarnings("rawtypes")
private final Class<? extends StreamRecord> recordType;
private DeserializationDelegate<T> deserializationDelegate;
private TupleSerializer<T> tupleSerializer;
/**
......@@ -68,7 +68,8 @@ public class StreamRecordReader<T extends Tuple> extends AbstractSingleGateRecor
* @param tupleSerializer
* tupleSerializer
*/
public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord<T>> recordType,
@SuppressWarnings("rawtypes")
public StreamRecordReader(AbstractInvokable taskBase, Class<? extends StreamRecord> recordType,
DeserializationDelegate<T> deserializationDelegate,
TupleSerializer<T> tupleSerializer) {
super(taskBase);
......@@ -153,6 +154,7 @@ public class StreamRecordReader<T extends Tuple> extends AbstractSingleGateRecor
return this.noMoreRecordsWillFollow;
}
@SuppressWarnings("unchecked")
private StreamRecord<T> instantiateRecordType() {
try {
return this.recordType.newInstance();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册