提交 73371101 编写于 作者: G Gyula Fora 提交者: mbalassi

[streaming] Streaming jobgraph and vertex refactor to match recent runtime changes

上级 7cc24006
......@@ -28,7 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamcomponent.StreamComponentException;
import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
......@@ -46,7 +46,7 @@ public class StreamConfig {
private static final String DIRECTED_EMIT = "directedEmit";
private static final String FUNCTION_NAME = "operatorName";
private static final String FUNCTION = "operator";
private static final String COMPONENT_NAME = "componentName";
private static final String VERTEX_NAME = "vertexName";
private static final String SERIALIZEDUDF = "serializedudf";
private static final String USER_FUNCTION = "userfunction";
private static final String BUFFER_TIMEOUT = "bufferTimeout";
......@@ -125,8 +125,13 @@ public class StreamConfig {
TypeSerializerWrapper<T> typeWrapper = (TypeSerializerWrapper<T>) SerializationUtils
.deserialize(serializedWrapper);
if (typeWrapper != null) {
return typeWrapper.getTypeInfo();
} else {
return null;
}
return typeWrapper.getTypeInfo();
}
public void setMutability(boolean isMutable) {
......@@ -145,7 +150,7 @@ public class StreamConfig {
return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
}
public void setUserInvokable(StreamInvokable<?> invokableObject) {
public void setUserInvokable(StreamInvokable<?,?> invokableObject) {
if (invokableObject != null) {
config.setClass(USER_FUNCTION, invokableObject.getClass());
......@@ -162,16 +167,16 @@ public class StreamConfig {
try {
return deserializeObject(config.getBytes(SERIALIZEDUDF, null));
} catch (Exception e) {
throw new StreamComponentException("Cannot instantiate user function", e);
throw new StreamVertexException("Cannot instantiate user function", e);
}
}
public void setComponentName(String componentName) {
config.setString(COMPONENT_NAME, componentName);
public void setVertexName(String vertexName) {
config.setString(VERTEX_NAME, vertexName);
}
public String getComponentName() {
return config.getString(COMPONENT_NAME, null);
public String getVertexName() {
return config.getString(VERTEX_NAME, null);
}
public void setFunction(byte[] serializedFunction, String functionName) {
......@@ -212,7 +217,7 @@ public class StreamConfig {
try {
return deserializeObject(config.getBytes(OUTPUT_SELECTOR, null));
} catch (Exception e) {
throw new StreamComponentException("Cannot deserialize and instantiate OutputSelector",
throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector",
e);
}
}
......
......@@ -26,7 +26,7 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable;
......@@ -200,7 +200,7 @@ public class BatchedDataStream<OUT> {
}
private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamOperatorInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.addFunction("batchReduce",
aggregate, dataStream.outTypeWrapper, dataStream.outTypeWrapper, invokable);
......
......@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.function.sink.WriteFormatAsText;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches;
import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis;
import org.apache.flink.streaming.api.invokable.SinkInvokable;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FilterInvokable;
import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable;
......@@ -558,7 +558,7 @@ public class DataStream<OUT> {
public SingleOutputStreamOperator<OUT, ?> max() {
return max(0);
}
/**
* Applies an aggregation that gives the count of the data point.
*
......@@ -568,15 +568,16 @@ public class DataStream<OUT> {
TypeSerializerWrapper<OUT> inTypeWrapper = outTypeWrapper;
TypeSerializerWrapper<Long> outTypeWrapper = new ObjectTypeWrapper<Long>(new Long(0));
return addFunction("counter", null, inTypeWrapper, outTypeWrapper, new CounterInvokable<OUT>());
return addFunction("counter", null, inTypeWrapper, outTypeWrapper,
new CounterInvokable<OUT>());
}
protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate, outTypeWrapper,
outTypeWrapper, invokable);
SingleOutputStreamOperator<OUT, ?> returnStream = addFunction("reduce", aggregate,
outTypeWrapper, outTypeWrapper, invokable);
return returnStream;
}
......@@ -759,7 +760,8 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsText(DataStream<OUT> inputStream, String path,
WriteFormatAsText<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
......@@ -909,7 +911,8 @@ public class DataStream<OUT> {
private DataStreamSink<OUT> writeAsCsv(DataStream<OUT> inputStream, String path,
WriteFormatAsCsv<OUT> format, int batchSize, OUT endTuple) {
DataStreamSink<OUT> returnStream = addSink(inputStream,
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple), inputStream.outTypeWrapper);
new WriteSinkFunctionByBatches<OUT>(path, format, batchSize, endTuple),
inputStream.outTypeWrapper);
jobGraphBuilder.setMutability(returnStream.getId(), false);
return returnStream;
}
......@@ -944,7 +947,7 @@ public class DataStream<OUT> {
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null);
jobGraphBuilder.addIterationSource(returnStream.getId(), this.getId(), iterationID,
jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism, waitTime);
return this.copy();
......@@ -966,15 +969,14 @@ public class DataStream<OUT> {
*/
protected <R> SingleOutputStreamOperator<R, ?> addFunction(String functionName,
final Function function, TypeSerializerWrapper<OUT> inTypeWrapper,
TypeSerializerWrapper<R> outTypeWrapper,
StreamOperatorInvokable<OUT, R> functionInvokable) {
TypeSerializerWrapper<R> outTypeWrapper, StreamInvokable<OUT, R> functionInvokable) {
DataStream<OUT> inputStream = this.copy();
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment,
functionName, outTypeWrapper);
try {
jobGraphBuilder.addTask(returnStream.getId(), functionInvokable, inTypeWrapper,
jobGraphBuilder.addStreamVertex(returnStream.getId(), functionInvokable, inTypeWrapper,
outTypeWrapper, functionName,
SerializationUtils.serialize((Serializable) function), degreeOfParallelism);
} catch (SerializationException e) {
......@@ -1049,13 +1051,13 @@ public class DataStream<OUT> {
}
private DataStreamSink<OUT> addSink(DataStream<OUT> inputStream,
SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> typeWrapper) {
SinkFunction<OUT> sinkFunction, TypeSerializerWrapper<OUT> inTypeWrapper) {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink",
outTypeWrapper);
try {
jobGraphBuilder.addSink(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
typeWrapper, "sink", SerializationUtils.serialize(sinkFunction),
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(sinkFunction),
inTypeWrapper, null, "sink", SerializationUtils.serialize(sinkFunction),
degreeOfParallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SinkFunction");
......
......@@ -82,7 +82,7 @@ public class IterativeDataStream<IN> extends
public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String iterationName) {
DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink", null);
jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
jobGraphBuilder.addIterationTail(returnStream.getId(), iterationTail.getId(),
iterationID.toString(), iterationTail.getParallelism(), waitTime);
jobGraphBuilder.setIterationSourceSettings(iterationID.toString(), iterationTail.getId());
......
......@@ -230,8 +230,8 @@ public abstract class StreamExecutionEnvironment {
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
null, outTypeWrapper, "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize elements");
}
......@@ -267,8 +267,8 @@ public abstract class StreamExecutionEnvironment {
try {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(
new FromElementsFunction<OUT>(data)), new ObjectTypeWrapper<OUT>(data
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
.iterator().next()), "source", SerializationUtils.serialize(function), 1);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize collection");
......@@ -311,8 +311,9 @@ public abstract class StreamExecutionEnvironment {
outTypeWrapper);
try {
jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
outTypeWrapper, "source", SerializationUtils.serialize(function), parallelism);
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(function),
null, outTypeWrapper, "source", SerializationUtils.serialize(function),
parallelism);
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize SourceFunction");
}
......@@ -454,7 +455,8 @@ public abstract class StreamExecutionEnvironment {
* <p>
* The program execution will be logged and displayed with a generated
* default name.
* @throws Exception
*
* @throws Exception
**/
public abstract void execute() throws Exception;
......
......@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.invokable;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
public class SinkInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
public class SinkInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
private SinkFunction<IN> sinkFunction;
......
......@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.flink.streaming.api.function.source.SourceFunction;
public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serializable {
public class SourceInvokable<OUT> extends StreamInvokable<OUT,OUT> implements Serializable {
private static final long serialVersionUID = 1L;
......@@ -38,4 +38,16 @@ public class SourceInvokable<OUT> extends StreamInvokable<OUT> implements Serial
sourceFunction.invoke(collector);
}
@Override
protected void immutableInvoke() throws Exception {
}
@Override
protected void mutableInvoke() throws Exception {
}
@Override
protected void callUserFunction() throws Exception {
}
}
......@@ -22,18 +22,30 @@ import java.io.Serializable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The StreamInvokable represents the base class for all invokables in
* the streaming topology.
* The StreamInvokable represents the base class for all invokables in the
* streaming topology.
*
* @param <OUT>
* The output type of the invokable
*/
public abstract class StreamInvokable<OUT> implements Serializable {
public abstract class StreamInvokable<IN, OUT> implements Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamInvokable.class);
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
protected StreamRecordSerializer<IN> serializer;
protected StreamRecord<IN> reuse;
protected boolean isMutable;
protected Collector<OUT> collector;
protected Function userFunction;
......@@ -43,8 +55,79 @@ public abstract class StreamInvokable<OUT> implements Serializable {
this.userFunction = userFunction;
}
public void setCollector(Collector<OUT> collector) {
/**
* Initializes the {@link StreamOperatorInvokable} for input and output
* handling
*
* @param collector
* Collector object for collecting the outputs for the operator
* @param recordIterator
* Iterator for reading in the input records
* @param serializer
* Serializer used to deserialize inputs
* @param isMutable
* Mutability setting for the operator
*/
public void initialize(Collector<OUT> collector,
MutableObjectIterator<StreamRecord<IN>> recordIterator,
StreamRecordSerializer<IN> serializer, boolean isMutable) {
this.collector = collector;
this.recordIterator = recordIterator;
this.serializer = serializer;
if(this.serializer != null){
this.reuse = serializer.createInstance();
}
this.isMutable = isMutable;
}
/**
* Re-initializes the object in which the next input record will be read in
*/
protected void resetReuse() {
this.reuse = serializer.createInstance();
}
/**
* Method that will be called if the mutability setting is set to immutable
*/
protected abstract void immutableInvoke() throws Exception;
/**
* Method that will be called if the mutability setting is set to mutable
*/
protected abstract void mutableInvoke() throws Exception;
/**
* The call of the user implemented function should be implemented here
*/
protected abstract void callUserFunction() throws Exception;
/**
* Method for logging exceptions thrown during the user function call
*/
protected void callUserFunctionAndLogException() {
try {
callUserFunction();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Calling user function failed due to: {}",
StringUtils.stringifyException(e));
}
}
}
/**
* Method that will be called when the stream starts. The user should encode
* the processing functionality in {@link #mutableInvoke()} and
* {@link #immutableInvoke()}
*
*/
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
} else {
immutableInvoke();
}
}
/**
......@@ -55,7 +138,7 @@ public abstract class StreamInvokable<OUT> implements Serializable {
* The configuration parameters for the operator
*/
public void open(Configuration parameters) throws Exception {
isRunning=true;
isRunning = true;
if (userFunction instanceof RichFunction) {
((RichFunction) userFunction).open(parameters);
}
......@@ -72,11 +155,4 @@ public abstract class StreamInvokable<OUT> implements Serializable {
((RichFunction) userFunction).close();
}
}
/**
* The method that will be called once when the operator is created, the
* working mechanics of the operator should be implemented here
*
*/
public abstract void invoke() throws Exception;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.invokable;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The StreamOperatorInvokable represents the base class for all operators in
* the streaming topology.
*
* @param <IN>
* Input type of the operator
* @param <OUT>
* Output type of the operator
*/
public abstract class StreamOperatorInvokable<IN, OUT> extends StreamInvokable<OUT> {
public StreamOperatorInvokable(Function userFunction) {
super(userFunction);
}
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(StreamOperatorInvokable.class);
protected MutableObjectIterator<StreamRecord<IN>> recordIterator;
protected StreamRecordSerializer<IN> serializer;
protected StreamRecord<IN> reuse;
protected boolean isMutable;
/**
* Initializes the {@link StreamOperatorInvokable} for input and output
* handling
*
* @param collector
* Collector object for collecting the outputs for the operator
* @param recordIterator
* Iterator for reading in the input records
* @param serializer
* Serializer used to deserialize inputs
* @param isMutable
* Mutability setting for the operator
*/
public void initialize(Collector<OUT> collector,
MutableObjectIterator<StreamRecord<IN>> recordIterator,
StreamRecordSerializer<IN> serializer, boolean isMutable) {
setCollector(collector);
this.recordIterator = recordIterator;
this.serializer = serializer;
this.reuse = serializer.createInstance();
this.isMutable = isMutable;
}
/**
* Re-initializes the object in which the next input record will be read in
*/
protected void resetReuse() {
this.reuse = serializer.createInstance();
}
/**
* Method that will be called if the mutability setting is set to immutable
*/
protected abstract void immutableInvoke() throws Exception;
/**
* Method that will be called if the mutability setting is set to mutable
*/
protected abstract void mutableInvoke() throws Exception;
/**
* The call of the user implemented function should be implemented here
*/
protected abstract void callUserFunction() throws Exception;
/**
* Method for logging exceptions thrown during the user function call
*/
protected void callUserFunctionAndLogException() {
try {
callUserFunction();
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Calling user function failed due to: {}",
StringUtils.stringifyException(e));
}
}
}
@Override
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
} else {
immutableInvoke();
}
}
}
......@@ -24,11 +24,11 @@ import java.util.Iterator;
import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.SlidingWindowState;
public class BatchGroupReduceInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
public class BatchGroupReduceInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
protected GroupReduceFunction<IN, OUT> reducer;
......
......@@ -24,11 +24,11 @@ import org.apache.commons.math.util.MathUtils;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.state.NullableCircularBuffer;
public class BatchReduceInvokable<OUT> extends StreamOperatorInvokable<OUT, OUT> {
public class BatchReduceInvokable<OUT> extends StreamInvokable<OUT, OUT> {
private static final long serialVersionUID = 1L;
protected ReduceFunction<OUT> reducer;
......
......@@ -17,9 +17,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
public class CounterInvokable<IN> extends StreamOperatorInvokable<IN, Long> {
public class CounterInvokable<IN> extends StreamInvokable<IN, Long> {
private static final long serialVersionUID = 1L;
Long count = 0L;
......
......@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
public class FilterInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
public class FilterInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
......
......@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
public class FlatMapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
public class FlatMapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private FlatMapFunction<IN, OUT> flatMapper;
......
......@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
public class MapInvokable<IN, OUT> extends StreamOperatorInvokable<IN, OUT> {
public class MapInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
private MapFunction<IN, OUT> mapper;
......
......@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.invokable.operator;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
public class StreamReduceInvokable<IN> extends StreamOperatorInvokable<IN, IN> {
public class StreamReduceInvokable<IN> extends StreamInvokable<IN, IN> {
private static final long serialVersionUID = 1L;
protected ReduceFunction<IN> reducer;
......
......@@ -27,7 +27,7 @@ import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<IN1, OUT> {
public CoInvokable(Function userFunction) {
super(userFunction);
......@@ -41,7 +41,6 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
protected StreamRecord<IN2> reuse2;
protected StreamRecordSerializer<IN1> serializer1;
protected StreamRecordSerializer<IN2> serializer2;
protected boolean isMutable;
public void initialize(Collector<OUT> collector,
CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
......@@ -71,14 +70,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
this.reuse2 = serializer2.createInstance();
}
public void invoke() throws Exception {
if (this.isMutable) {
mutableInvoke();
} else {
immutableInvoke();
}
}
@Override
protected void immutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
......@@ -96,6 +88,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
}
@Override
protected void mutableInvoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
......@@ -149,4 +142,8 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamInvokable<OUT> {
}
}
@Override
protected void callUserFunction() throws Exception {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamSink<IN> extends AbstractStreamComponent {
private static final Logger LOG = LoggerFactory.getLogger(StreamSink.class);
private InputHandler<IN> inputHandler;
private StreamOperatorInvokable<IN, IN> userInvokable;
public StreamSink() {
userInvokable = null;
}
@Override
public void setInputsOutputs() {
inputHandler = new InputHandler<IN>(this);
}
@Override
protected void setInvokable() {
userInvokable = configuration.getUserInvokable();
userInvokable.initialize(null, inputHandler.getInputIter(), inputHandler.getInputSerializer(),
isMutable);
}
@Override
public void invoke() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("SINK {} invoked", getName());
}
invokeUserFunction(userInvokable);
if (LOG.isDebugEnabled()) {
LOG.debug("SINK {} invoke finished", getName());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.invokable.SourceInvokable;
public class StreamSource<OUT extends Tuple> extends AbstractStreamComponent {
protected OutputHandler<OUT> outputHandler;
private SourceInvokable<OUT> sourceInvokable;
private static int numSources;
public StreamSource() {
sourceInvokable = null;
numSources = newComponent();
instanceID = numSources;
}
@Override
public void setInputsOutputs() {
outputHandler = new OutputHandler<OUT>(this);
}
@Override
protected void setInvokable() {
sourceInvokable = configuration.getUserInvokable();
sourceInvokable.setCollector(outputHandler.getCollector());
}
@Override
public void invoke() throws Exception {
outputHandler.invokeUserFunction("SOURCE", sourceInvokable);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
public class StreamTask<IN extends Tuple, OUT extends Tuple> extends AbstractStreamComponent {
private InputHandler<IN> inputHandler;
private OutputHandler<OUT> outputHandler;
private StreamOperatorInvokable<IN, OUT> userInvokable;
private static int numTasks;
public StreamTask() {
userInvokable = null;
numTasks = newComponent();
instanceID = numTasks;
}
@Override
public void setInputsOutputs() {
inputHandler = new InputHandler<IN>(this);
outputHandler = new OutputHandler<OUT>(this);
}
@Override
protected void setInvokable() {
userInvokable = configuration.getUserInvokable();
userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
inputHandler.getInputSerializer(), isMutable);
}
@Override
public void invoke() throws Exception {
outputHandler.invokeUserFunction("TASK", userInvokable);
}
}
......@@ -15,12 +15,11 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import java.util.ArrayList;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
......@@ -30,8 +29,8 @@ import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.CoRecordReader;
import org.apache.flink.util.MutableObjectIterator;
public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tuple> extends
AbstractStreamComponent {
public class CoStreamVertex<IN1, IN2, OUT> extends
StreamVertex<IN1,OUT> {
private OutputHandler<OUT> outputHandler;
......@@ -47,9 +46,9 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
private CoInvokable<IN1, IN2, OUT> userInvokable;
private static int numTasks;
public CoStreamTask() {
public CoStreamVertex() {
userInvokable = null;
numTasks = newComponent();
numTasks = newVertex();
instanceID = numTasks;
}
......@@ -78,7 +77,7 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
inputDeserializer2, isMutable);
}
protected void setConfigInputs() throws StreamComponentException {
protected void setConfigInputs() throws StreamVertexException {
setDeserializers();
int numberOfInputs = configuration.getNumberOfInputs();
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -35,51 +35,56 @@ public class InputHandler<IN> {
private MutableObjectIterator<StreamRecord<IN>> inputIter;
private MutableReader<IOReadableWritable> inputs;
private AbstractStreamComponent streamComponent;
private StreamVertex<IN,?> streamVertex;
private StreamConfig configuration;
public InputHandler(AbstractStreamComponent streamComponent) {
this.streamComponent = streamComponent;
public InputHandler(StreamVertex<IN,?> streamComponent) {
this.streamVertex = streamComponent;
this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
try {
setConfigInputs();
} catch (Exception e) {
throw new StreamComponentException("Cannot register inputs for "
throw new StreamVertexException("Cannot register inputs for "
+ getClass().getSimpleName(), e);
}
}
@SuppressWarnings("unchecked")
protected void setConfigInputs() throws StreamComponentException {
protected void setConfigInputs() throws StreamVertexException {
setDeserializer();
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
if (numberOfInputs < 2) {
if (numberOfInputs < 2) {
inputs = new MutableRecordReader<IOReadableWritable>(streamComponent);
inputs = new MutableRecordReader<IOReadableWritable>(streamVertex);
} else {
MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
} else {
MutableRecordReader<IOReadableWritable>[] recordReaders = (MutableRecordReader<IOReadableWritable>[]) new MutableRecordReader<?>[numberOfInputs];
for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamComponent);
for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<IOReadableWritable>(streamVertex);
}
inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
}
inputs = new MutableUnionRecordReader<IOReadableWritable>(recordReaders);
}
inputIter = createInputIterator();
inputIter = createInputIterator();
}
}
private void setDeserializer() {
TypeInformation<IN> inTupleTypeInfo = configuration.getTypeInfoIn1();
inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
if (inTupleTypeInfo != null) {
inputSerializer = new StreamRecordSerializer<IN>(inTupleTypeInfo);
}
}
private MutableObjectIterator<StreamRecord<IN>> createInputIterator() {
@SuppressWarnings({ "unchecked", "rawtypes" })
final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs, inputSerializer);
final MutableObjectIterator<StreamRecord<IN>> iter = new ReaderIterator(inputs,
inputSerializer);
return iter;
}
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import java.io.IOException;
import java.util.LinkedList;
......@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
public class OutputHandler<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
private AbstractStreamComponent streamComponent;
private StreamVertex<?,OUT> streamVertex;
private StreamConfig configuration;
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputs;
......@@ -50,15 +50,15 @@ public class OutputHandler<OUT> {
StreamRecordSerializer<OUT> outSerializer = null;
SerializationDelegate<StreamRecord<OUT>> outSerializationDelegate = null;
public OutputHandler(AbstractStreamComponent streamComponent) {
this.streamComponent = streamComponent;
public OutputHandler(StreamVertex<?,OUT> streamComponent) {
this.streamVertex = streamComponent;
this.outputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
this.configuration = new StreamConfig(streamComponent.getTaskConfiguration());
try {
setConfigOutputs();
} catch (StreamComponentException e) {
throw new StreamComponentException("Cannot register outputs for "
} catch (StreamVertexException e) {
throw new StreamVertexException("Cannot register outputs for "
+ streamComponent.getClass().getSimpleName(), e);
}
}
......@@ -80,13 +80,13 @@ public class OutputHandler<OUT> {
}
private StreamCollector<OUT> setCollector() {
if (streamComponent.configuration.getDirectedEmit()) {
OutputSelector<OUT> outputSelector = streamComponent.configuration.getOutputSelector();
if (streamVertex.configuration.getDirectedEmit()) {
OutputSelector<OUT> outputSelector = streamVertex.configuration.getOutputSelector();
collector = new DirectedStreamCollector<OUT>(streamComponent.getInstanceID(),
collector = new DirectedStreamCollector<OUT>(streamVertex.getInstanceID(),
outSerializationDelegate, outputSelector);
} else {
collector = new StreamCollector<OUT>(streamComponent.getInstanceID(),
collector = new StreamCollector<OUT>(streamVertex.getInstanceID(),
outSerializationDelegate);
}
return collector;
......@@ -98,9 +98,11 @@ public class OutputHandler<OUT> {
void setSerializers() {
outTypeInfo = configuration.getTypeInfoOut1();
outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
outSerializationDelegate.setInstance(outSerializer.createInstance());
if (outTypeInfo != null) {
outSerializer = new StreamRecordSerializer<OUT>(outTypeInfo);
outSerializationDelegate = new SerializationDelegate<StreamRecord<OUT>>(outSerializer);
outSerializationDelegate.setInstance(outSerializer.createInstance());
}
}
void setPartitioner(int outputNumber,
......@@ -111,17 +113,17 @@ public class OutputHandler<OUT> {
outputPartitioner = configuration.getPartitioner(outputNumber);
} catch (Exception e) {
throw new StreamComponentException("Cannot deserialize partitioner for "
+ streamComponent.getName() + " with " + outputNumber + " outputs", e);
throw new StreamVertexException("Cannot deserialize partitioner for "
+ streamVertex.getName() + " with " + outputNumber + " outputs", e);
}
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
if (bufferTimeout > 0) {
output = new StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>(
streamComponent, outputPartitioner, bufferTimeout);
streamVertex, outputPartitioner, bufferTimeout);
} else {
output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamComponent,
output = new RecordWriter<SerializationDelegate<StreamRecord<OUT>>>(streamVertex,
outputPartitioner);
}
......@@ -153,17 +155,17 @@ public class OutputHandler<OUT> {
long startTime;
public void invokeUserFunction(String componentTypeName, StreamInvokable<OUT> userInvokable)
public void invokeUserFunction(String componentTypeName, StreamInvokable<?,OUT> userInvokable)
throws IOException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} invoked with instance id {}", componentTypeName,
streamComponent.getName(), streamComponent.getInstanceID());
streamVertex.getName(), streamVertex.getInstanceID());
}
initializeOutputSerializers();
try {
streamComponent.invokeUserFunction(userInvokable);
streamVertex.invokeUserFunction(userInvokable);
} catch (Exception e) {
flushOutputs();
throw new RuntimeException(e);
......@@ -171,7 +173,7 @@ public class OutputHandler<OUT> {
if (LOG.isDebugEnabled()) {
LOG.debug("{} {} invoke finished instance id {}", componentTypeName,
streamComponent.getName(), streamComponent.getInstanceID());
streamVertex.getName(), streamVertex.getInstanceID());
}
flushOutputs();
......
......@@ -15,23 +15,23 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
public class StreamIterationHead<OUT extends Tuple> extends StreamVertex<OUT,OUT> {
private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSource.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
private OutputHandler<OUT> outputHandler;
......@@ -43,8 +43,8 @@ public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComp
private boolean shouldWait;
@SuppressWarnings("rawtypes")
public StreamIterationSource() {
numSources = newComponent();
public StreamIterationHead() {
numSources = newVertex();
instanceID = numSources;
dataChannel = new ArrayBlockingQueue<StreamRecord>(1);
}
......
......@@ -15,21 +15,21 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamIterationSink<IN extends Tuple> extends AbstractStreamComponent {
public class StreamIterationTail<IN extends Tuple> extends StreamVertex<IN,IN> {
private static final Logger LOG = LoggerFactory.getLogger(StreamIterationSink.class);
private static final Logger LOG = LoggerFactory.getLogger(StreamIterationTail.class);
private InputHandler<IN> inputHandler;
......@@ -39,7 +39,7 @@ public class StreamIterationSink<IN extends Tuple> extends AbstractStreamCompone
private long iterationWaitTime;
private boolean shouldWait;
public StreamIterationSink() {
public StreamIterationTail() {
}
@Override
......@@ -52,7 +52,7 @@ public class StreamIterationSink<IN extends Tuple> extends AbstractStreamCompone
shouldWait = iterationWaitTime > 0;
dataChannel = BlockingQueueBroker.instance().get(iterationId);
} catch (Exception e) {
throw new StreamComponentException(String.format(
throw new StreamVertexException(String.format(
"Cannot register inputs of StreamIterationSink %s", iterationId), e);
}
}
......
......@@ -15,25 +15,37 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
public abstract class AbstractStreamComponent extends AbstractInvokable {
public class StreamVertex<IN, OUT> extends AbstractInvokable {
private static int numTasks;
protected StreamConfig configuration;
protected int instanceID;
protected String name;
private static int numComponents = 0;
private static int numVertices = 0;
protected boolean isMutable;
protected Object function;
protected String functionName;
private InputHandler<IN> inputHandler;
private OutputHandler<OUT> outputHandler;
private StreamInvokable<IN, OUT> userInvokable;
public StreamVertex() {
userInvokable = null;
numTasks = newVertex();
instanceID = numTasks;
}
protected static int newComponent() {
numComponents++;
return numComponents;
protected static int newVertex() {
numVertices++;
return numVertices;
}
@Override
......@@ -45,22 +57,30 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
protected void initialize() {
this.configuration = new StreamConfig(getTaskConfiguration());
this.name = configuration.getComponentName();
this.name = configuration.getVertexName();
this.isMutable = configuration.getMutability();
this.functionName = configuration.getFunctionName();
this.function = configuration.getFunction();
}
protected <T> void invokeUserFunction(StreamInvokable<T> userInvokable) throws Exception {
protected <T> void invokeUserFunction(StreamInvokable<?,T> userInvokable) throws Exception {
userInvokable.open(getTaskConfiguration());
userInvokable.invoke();
userInvokable.close();
}
protected abstract void setInputsOutputs();
protected abstract void setInvokable();
public void setInputsOutputs() {
inputHandler = new InputHandler<IN>(this);
outputHandler = new OutputHandler<OUT>(this);
}
protected void setInvokable() {
userInvokable = configuration.getUserInvokable();
userInvokable.initialize(outputHandler.getCollector(), inputHandler.getInputIter(),
inputHandler.getInputSerializer(), isMutable);
}
public String getName() {
return name;
}
......@@ -68,4 +88,9 @@ public abstract class AbstractStreamComponent extends AbstractInvokable {
public int getInstanceID() {
return instanceID;
}
@Override
public void invoke() throws Exception {
outputHandler.invokeUserFunction("TASK", userInvokable);
}
}
......@@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
/**
* An exception that is thrown by the stream components when encountering an
* An exception that is thrown by the stream verices when encountering an
* illegal condition.
*/
public class StreamComponentException extends RuntimeException {
public class StreamVertexException extends RuntimeException {
/**
* Serial version UID for serialization interoperability.
......@@ -31,7 +31,7 @@ public class StreamComponentException extends RuntimeException {
/**
* Creates a compiler exception with no message and no cause.
*/
public StreamComponentException() {
public StreamVertexException() {
}
/**
......@@ -40,7 +40,7 @@ public class StreamComponentException extends RuntimeException {
* @param message
* The message for the exception.
*/
public StreamComponentException(String message) {
public StreamVertexException(String message) {
super(message);
}
......@@ -50,7 +50,7 @@ public class StreamComponentException extends RuntimeException {
* @param cause
* The <tt>Throwable</tt> that caused this exception.
*/
public StreamComponentException(Throwable cause) {
public StreamVertexException(Throwable cause) {
super(cause);
}
......@@ -62,7 +62,7 @@ public class StreamComponentException extends RuntimeException {
* @param cause
* The <tt>Throwable</tt> that caused this exception.
*/
public StreamComponentException(String message, Throwable cause) {
public StreamVertexException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -18,13 +18,13 @@
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertArrayEquals;
import java.util.ArrayList;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
import org.apache.flink.streaming.util.MockRecordWriterFactory;
import org.junit.Test;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import java.util.ArrayList;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.api.streamvertex;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
......@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
public class StreamComponentTest {
public class StreamVertexTest {
private static Map<Integer, Integer> data = new HashMap<Integer, Integer>();
......@@ -81,24 +81,11 @@ public class StreamComponentTest {
private static final int SOURCE_PARALELISM = 1;
private static final long MEMORYSIZE = 32;
// @Test
@Test
public void wrongJobGraph() {
LocalStreamEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(SOURCE_PARALELISM);
try {
env.execute();
fail();
} catch (Exception e) {
}
env.fromCollection(Arrays.asList("a", "b"));
try {
env.execute();
fail();
} catch (Exception e) {
}
try {
env.fromCollection(null);
......
......@@ -25,7 +25,7 @@ import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.util.Collector;
......@@ -88,7 +88,7 @@ public class MockInvokable<IN, OUT> {
return iterator;
}
public static <IN, OUT> List<OUT> createAndExecute(StreamOperatorInvokable<IN, OUT> invokable, List<IN> inputs) {
public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) {
MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
try {
......
......@@ -21,7 +21,7 @@ import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
import org.mockito.Mockito;
public class MockRecordWriterFactory {
......
......@@ -23,8 +23,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class WindowJoinLocal {
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALLELISM = 1;
private static final int PARALLELISM = 4;
private static final int SOURCE_PARALLELISM = 2;
// This example will join two streams with a sliding window. One which emits
// people's grades and one which emits people's salaries.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册