提交 c3ec1e1f 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] CoRecordReader and iterator added + ConnectedDataStream refactor

上级 fa75af09
......@@ -17,16 +17,16 @@
package org.apache.flink.streaming.api.collector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.StringUtils;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.util.StringUtils;
/**
* A StreamCollector that uses user defined output names and a user defined
......@@ -39,7 +39,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
OutputSelector<OUT> outputSelector;
private static final Log LOG = LogFactory.getLog(DirectedStreamCollector.class);
private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
private Set<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> emitted;
/**
* Creates a new DirectedStreamCollector
......@@ -56,7 +56,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
OutputSelector<OUT> outputSelector) {
super(channelID, serializationDelegate);
this.outputSelector = outputSelector;
this.emitted = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
}
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import java.io.Serializable;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.RichReduceFunction;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* The ConnectedDataStream represents a stream for two different data types. It
* can be used to apply transformations like {@link CoMapFunction} on two
* {@link DataStream}s
*
* @param <IN1>
* Type of the first DataSteam.
* @param <IN2>
* Type of the second DataStream.
*/
public class ConnectedDataStream<IN1, IN2> {
StreamExecutionEnvironment environment;
JobGraphBuilder jobGraphBuilder;
DataStream<IN1> input1;
DataStream<IN2> input2;
protected ConnectedDataStream(StreamExecutionEnvironment environment,
JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1,
DataStream<IN2> input2) {
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
this.input1 = input1.copy();
this.input2 = input2.copy();
}
/**
* Returns the first {@link DataStream}.
*
* @return The first DataStream.
*/
public DataStream<IN1> getFirst() {
return input1.copy();
}
/**
* Returns the second {@link DataStream}.
*
* @return The second DataStream.
*/
public DataStream<IN2> getSecond() {
return input2.copy();
}
/**
* Sets the partitioning of the two separate {@link DataStream}s so that the
* output tuples are partitioned by their hashcode and are sent to only one
* component.
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
* first input stream.
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return The DataStream with field partitioning set.
*/
public ConnectedDataStream<IN1, IN2> partitionBy(int keyPosition1,
int keyPosition2) {
if (keyPosition1 < 0 || keyPosition2 < 0) {
throw new IllegalArgumentException(
"The position of the field must be non-negative");
}
return new ConnectedDataStream<IN1, IN2>(this.environment,
this.jobGraphBuilder, getFirst().setConnectionType(
new FieldsPartitioner<IN1>(keyPosition1)), getSecond()
.setConnectionType(
new FieldsPartitioner<IN2>(keyPosition2)));
}
/**
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Basically
* used before Reduce operation.
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
* first input stream.
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return
*/
public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1,
int keyPosition2) {
return this.partitionBy(keyPosition1, keyPosition2);
}
/**
* Applies a CoMap transformation on two separate {@link DataStream}s. The
* transformation calls a {@link CoMapFunction#map1} for each element of the
* first input and {@link CoMapFunction#map2} for each element of the second
* input. Each CoMapFunction call returns exactly one element. The user can
* also extend {@link RichCoMapFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coMapper
* The CoMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> map(
CoMapFunction<IN1, IN2, OUT> coMapper) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(
coMapper, CoMapFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(
coMapper, CoMapFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(
coMapper, CoMapFunction.class, 2);
return addCoFunction("coMap", coMapper,
in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoMapInvokable<IN1, IN2, OUT>(coMapper));
}
/**
* Applies a CoFlatMap transformation on two separate {@link DataStream}s.
* The transformation calls a {@link CoFlatMapFunction#map1} for each
* element of the first input and {@link CoFlatMapFunction#map2} for each
* element of the second input. Each CoFlatMapFunction call returns any
* number of elements including none. The user can also extend
* {@link RichFlatMapFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
*
* @param coFlatMapper
* The CoFlatMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(
coFlatMapper, CoFlatMapFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(
coFlatMapper, CoFlatMapFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(
coFlatMapper, CoFlatMapFunction.class, 2);
return addCoFunction("coFlatMap", coFlatMapper,
in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
}
/**
* Applies a CoReduce transformation on the grouped data stream grouped on
* by the given key position. The {@link CoReduceFunction} will receive
* input values based on the key positions. The transformation calls
* {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
* each element of the first input and {@link CoReduceFunction#reduce2} and
* {@link CoReduceFunction#map2} for each element of the second input. For
* each input, only values with the same key will go to the same reducer.
* The user can also extend {@link RichReduceFunction} to gain access to
* other features provided by the {@link RichFuntion} interface.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every two
* element with the same key of each input DataStream.
* @param keyPosition1
* position of the key in the first input DataStream
* @param keyPosition2
* position of the key in the second input DataStream
* @return The transformed DataStream.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(
CoReduceFunction<IN1, IN2, OUT> coReducer, int keyPosition1,
int keyPosition2) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(
coReducer, CoReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(
coReducer, CoReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(
coReducer, CoReduceFunction.class, 2);
return addCoFunction("coReduce", coReducer,
in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer,
keyPosition1, keyPosition2));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(
String functionName, final Function function,
TypeSerializerWrapper<IN1> in1TypeWrapper,
TypeSerializerWrapper<IN2> in2TypeWrapper,
TypeSerializerWrapper<OUT> outTypeWrapper,
CoInvokable<IN1, IN2, OUT> functionInvokable) {
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName);
try {
input1.jobGraphBuilder.addCoTask(returnStream.getId(),
functionInvokable, in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
SerializationUtils.serialize((Serializable) function),
environment.getDegreeOfParallelism());
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
}
input1.connectGraph(input1, returnStream.getId(), 1);
input1.connectGraph(input2, returnStream.getId(), 2);
// TODO consider iteration
return returnStream;
}
}
*/
package org.apache.flink.streaming.api.datastream;
import java.io.Serializable;
import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.java.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
import org.apache.flink.streaming.util.serialization.TypeSerializerWrapper;
/**
* The ConnectedDataStream represents a stream for two different data types. It
* can be used to apply transformations like {@link CoMapFunction} on two
* {@link DataStream}s
*
* @param <IN1>
* Type of the first DataSteam.
* @param <IN2>
* Type of the second DataStream.
*/
public class ConnectedDataStream<IN1, IN2> {
StreamExecutionEnvironment environment;
JobGraphBuilder jobGraphBuilder;
DataStream<IN1> input1;
DataStream<IN2> input2;
protected ConnectedDataStream(StreamExecutionEnvironment environment,
JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2) {
this.jobGraphBuilder = jobGraphBuilder;
this.environment = environment;
this.input1 = input1.copy();
this.input2 = input2.copy();
}
/**
* Returns the first {@link DataStream}.
*
* @return The first DataStream.
*/
public DataStream<IN1> getFirst() {
return input1.copy();
}
/**
* Returns the second {@link DataStream}.
*
* @return The second DataStream.
*/
public DataStream<IN2> getSecond() {
return input2.copy();
}
/**
* GroupBy operation for connected data stream. Groups the elements of
* input1 and input2 according to keyPosition1 and keyPosition2. Used for
* applying function on grouped data streams for example
* {@link GroupedConnectedDataStream#reduce}
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
* first input stream.
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return Returns the {@link GroupedConnectedDataStream} created.
*/
public GroupedConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
if (keyPosition1 < 0 || keyPosition2 < 0) {
throw new IllegalArgumentException("The position of the field must be non-negative");
}
return new GroupedConnectedDataStream<IN1, IN2>(this.environment, this.jobGraphBuilder,
getFirst().partitionBy(keyPosition1), getSecond().partitionBy(keyPosition2),
keyPosition1, keyPosition2);
}
/**
* Applies a CoMap transformation on two separate {@link DataStream}s. The
* transformation calls a {@link CoMapFunction#map1} for each element of the
* first input and {@link CoMapFunction#map2} for each element of the second
* input. Each CoMapFunction call returns exactly one element. The user can
* also extend {@link RichCoMapFunction} to gain access to other features
* provided by the {@link RichFuntion} interface.
*
* @param coMapper
* The CoMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
CoMapFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coMapper,
CoMapFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coMapper,
CoMapFunction.class, 2);
return addCoFunction("coMap", coMapper, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoMapInvokable<IN1, IN2, OUT>(coMapper));
}
/**
* Applies a CoFlatMap transformation on two separate {@link DataStream}s.
* The transformation calls a {@link CoFlatMapFunction#map1} for each
* element of the first input and {@link CoFlatMapFunction#map2} for each
* element of the second input. Each CoFlatMapFunction call returns any
* number of elements including none. The user can also extend
* {@link RichFlatMapFunction} to gain access to other features provided by
* the {@link RichFuntion} interface.
*
* @param coFlatMapper
* The CoFlatMapFunction used to jointly transform the two input
* DataStreams
* @return The transformed DataStream
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coFlatMapper,
CoFlatMapFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coFlatMapper,
CoFlatMapFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coFlatMapper,
CoFlatMapFunction.class, 2);
return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
final Function function, TypeSerializerWrapper<IN1> in1TypeWrapper,
TypeSerializerWrapper<IN2> in2TypeWrapper, TypeSerializerWrapper<OUT> outTypeWrapper,
CoInvokable<IN1, IN2, OUT> functionInvokable) {
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
environment, functionName);
try {
input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
SerializationUtils.serialize((Serializable) function),
environment.getDegreeOfParallelism());
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize user defined function");
}
input1.connectGraph(input1, returnStream.getId(), 1);
input1.connectGraph(input2, returnStream.getId(), 2);
// TODO consider iteration
return returnStream;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.datastream;
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
public class GroupedConnectedDataStream<IN1, IN2> extends ConnectedDataStream<IN1, IN2> {
int keyPosition1;
int keyPosition2;
protected GroupedConnectedDataStream(StreamExecutionEnvironment environment,
JobGraphBuilder jobGraphBuilder, DataStream<IN1> input1, DataStream<IN2> input2,
int keyPosition1, int keyPosition2) {
super(environment, jobGraphBuilder, input1, input2);
this.keyPosition1 = keyPosition1;
this.keyPosition2 = keyPosition2;
}
/**
* Applies a CoReduce transformation on the grouped data stream grouped on
* by the given key position. The {@link CoReduceFunction} will receive
* input values based on the key positions. The transformation calls
* {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
* each element of the first input and {@link CoReduceFunction#reduce2} and
* {@link CoReduceFunction#map2} for each element of the second input. For
* each input, only values with the same key will go to the same reducer.
*
* @param coReducer
* The {@link CoReduceFunction} that will be called for every two
* element with the same key of each input DataStream.
* @return The transformed DataStream.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
CoReduceFunction.class, 0);
FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
CoReduceFunction.class, 1);
FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
CoReduceFunction.class, 2);
return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
new CoGroupReduceInvokable<IN1, IN2, OUT>(coReducer, keyPosition1, keyPosition2));
}
}
......@@ -21,8 +21,8 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokable<OUT> {
......@@ -32,8 +32,7 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
private static final long serialVersionUID = 1L;
protected MutableObjectIterator<StreamRecord<IN1>> recordIterator1;
protected MutableObjectIterator<StreamRecord<IN2>> recordIterator2;
protected CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator;
protected StreamRecord<IN1> reuse1;
protected StreamRecord<IN2> reuse2;
protected StreamRecordSerializer<IN1> serializer1;
......@@ -41,16 +40,13 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
protected boolean isMutable;
public void initialize(Collector<OUT> collector,
MutableObjectIterator<StreamRecord<IN1>> recordIterator1,
StreamRecordSerializer<IN1> serializer1,
MutableObjectIterator<StreamRecord<IN2>> recordIterator2,
StreamRecordSerializer<IN2> serializer2, boolean isMutable) {
CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> recordIterator,
StreamRecordSerializer<IN1> serializer1, StreamRecordSerializer<IN2> serializer2,
boolean isMutable) {
this.collector = collector;
this.recordIterator1 = recordIterator1;
this.recordIterator = recordIterator;
this.reuse1 = serializer1.createInstance();
this.recordIterator2 = recordIterator2;
this.reuse2 = serializer2.createInstance();
this.serializer1 = serializer1;
......@@ -58,30 +54,32 @@ public abstract class CoInvokable<IN1, IN2, OUT> extends StreamComponentInvokabl
this.isMutable = isMutable;
}
public void resetReuse() {
protected void resetReuseAll() {
this.reuse1 = serializer1.createInstance();
this.reuse2 = serializer2.createInstance();
}
public void invoke() throws Exception {
boolean noMoreRecordOnInput1 = false;
boolean noMoreRecordOnInput2 = false;
protected void resetReuse1() {
this.reuse1 = serializer1.createInstance();
}
do {
noMoreRecordOnInput1 = ((reuse1 = recordIterator1.next(reuse1)) == null);
if (!noMoreRecordOnInput1) {
handleStream1();
}
protected void resetReuse2() {
this.reuse2 = serializer2.createInstance();
}
noMoreRecordOnInput2 = ((reuse2 = recordIterator2.next(reuse2)) == null);
if (!noMoreRecordOnInput2) {
public void invoke() throws Exception {
while (true) {
int next = recordIterator.next(reuse1, reuse2);
if (next == 0) {
break;
} else if (next == 1) {
handleStream1();
resetReuse1();
} else {
handleStream2();
resetReuse2();
}
if (!this.isMutable) {
resetReuse();
}
} while (!noMoreRecordOnInput1 || !noMoreRecordOnInput2);
}
}
public abstract void handleStream1() throws Exception;
......
......@@ -20,13 +20,13 @@ package org.apache.flink.streaming.api.streamcomponent;
import java.util.ArrayList;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.MutableReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.MutableUnionRecordReader;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
import org.apache.flink.streaming.io.CoRecordReader;
import org.apache.flink.types.TypeInformation;
import org.apache.flink.util.MutableObjectIterator;
......@@ -34,15 +34,16 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
AbstractStreamComponent {
private OutputHandler<OUT> outputHandler;
protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
private MutableReader<IOReadableWritable> inputs1;
private MutableReader<IOReadableWritable> inputs2;
MutableObjectIterator<StreamRecord<IN1>> inputIter1;
MutableObjectIterator<StreamRecord<IN2>> inputIter2;
CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
private CoInvokable<IN1, IN2, OUT> userInvokable;
private static int numTasks;
......@@ -52,30 +53,29 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
instanceID = numTasks;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void setDeserializers() {
TypeInformation<IN1> inputTypeInfo1 = configuration.getTypeInfoIn1();
inputDeserializer1 = new StreamRecordSerializer<IN1>(inputTypeInfo1);
TypeInformation<IN2> inputTypeInfo2 = configuration.getTypeInfoIn2();
inputDeserializer2 = new StreamRecordSerializer(inputTypeInfo2);
inputDeserializer2 = new StreamRecordSerializer<IN2>(inputTypeInfo2);
}
@Override
public void setInputsOutputs() {
outputHandler = new OutputHandler<OUT>(this);
setConfigInputs();
inputIter1 = InputHandler.staticCreateInputIterator(inputs1, inputDeserializer1);
inputIter2 = InputHandler.staticCreateInputIterator(inputs2, inputDeserializer2);
coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
inputDeserializer1, inputDeserializer2);
}
@Override
protected void setInvokable() {
userInvokable = configuration.getUserInvokable();
userInvokable.initialize(outputHandler.getCollector(), inputIter1, inputDeserializer1,
inputIter2, inputDeserializer2, isMutable);
userInvokable.initialize(outputHandler.getCollector(), coIter, inputDeserializer1,
inputDeserializer2, isMutable);
}
protected void setConfigInputs() throws StreamComponentException {
......@@ -83,39 +83,27 @@ public class CoStreamTask<IN1 extends Tuple, IN2 extends Tuple, OUT extends Tupl
int numberOfInputs = configuration.getNumberOfInputs();
ArrayList<MutableRecordReader<IOReadableWritable>> inputList1 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
ArrayList<MutableRecordReader<IOReadableWritable>> inputList2 = new ArrayList<MutableRecordReader<IOReadableWritable>>();
ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>> inputList1 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>>();
ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>> inputList2 = new ArrayList<MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>>();
for (int i = 0; i < numberOfInputs; i++) {
int inputType = configuration.getInputType(i);
switch (inputType) {
case 1:
inputList1.add(new MutableRecordReader<IOReadableWritable>(this));
inputList1.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN1>>>(
this));
break;
case 2:
inputList2.add(new MutableRecordReader<IOReadableWritable>(this));
inputList2.add(new MutableRecordReader<DeserializationDelegate<StreamRecord<IN2>>>(
this));
break;
default:
throw new RuntimeException("Invalid input type number: " + inputType);
}
}
inputs1 = getInputs(inputList1);
inputs2 = getInputs(inputList2);
}
@SuppressWarnings("unchecked")
private MutableReader<IOReadableWritable> getInputs(
ArrayList<MutableRecordReader<IOReadableWritable>> inputList) {
if (inputList.size() == 1) {
return inputList.get(0);
} else if (inputList.size() > 1) {
MutableRecordReader<IOReadableWritable>[] inputArray = inputList
.toArray(new MutableRecordReader[inputList.size()]);
return new MutableUnionRecordReader<IOReadableWritable>(inputArray);
}
return null;
coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
inputList1, inputList2);
}
@Override
......
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.collector.StreamCollector;
import org.apache.flink.streaming.api.invokable.StreamComponentInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.StreamRecordWriter;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.types.TypeInformation;
......
......@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
import org.apache.flink.util.StringUtils;
public class StreamIterationSink<IN extends Tuple> extends
......
......@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.io.network.api.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.io.BlockingQueueBroker;
public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComponent {
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.io;
import java.util.concurrent.BlockingQueue;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.io;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
/**
* A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
* input types.
*/
public final class CoReaderIterator<T1, T2> {
private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
// source
private final DeserializationDelegate<T1> delegate1;
private final DeserializationDelegate<T2> delegate2;
public CoReaderIterator(
CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
TypeSerializer<T1> serializer1, TypeSerializer<T2> serializer2) {
this.reader = reader;
this.delegate1 = new DeserializationDelegate<T1>(serializer1);
this.delegate2 = new DeserializationDelegate<T2>(serializer2);
}
public int next(T1 target1, T2 target2) throws IOException {
this.delegate1.setInstance(target1);
this.delegate2.setInstance(target2);
try {
return this.reader.getNextRecord(this.delegate1, this.delegate2);
} catch (InterruptedException e) {
throw new IOException("Reader interrupted.", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.io;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.event.task.AbstractTaskEvent;
import org.apache.flink.runtime.io.network.api.AbstractRecordReader;
import org.apache.flink.runtime.io.network.api.MutableRecordReader;
import org.apache.flink.runtime.io.network.gates.InputChannelResult;
import org.apache.flink.runtime.io.network.gates.InputGate;
import org.apache.flink.runtime.io.network.gates.RecordAvailabilityListener;
/**
* A CoRecordReader wraps {@link MutableRecordReader}s of two different input
* types to read records effectively.
*/
@SuppressWarnings("rawtypes")
public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
AbstractRecordReader implements RecordAvailabilityListener {
/**
* Sets of input gates for the two input types
*/
private Set<InputGate<T1>> inputGates1;
private Set<InputGate<T2>> inputGates2;
private final Set<InputGate> remainingInputGates;
/**
* Queue with indices of channels that store at least one available record.
*/
private final ArrayDeque<InputGate> availableInputGates = new ArrayDeque<InputGate>();
/**
* The next input gate to read a record from.
*/
private InputGate nextInputGateToReadFrom;
@Override
public boolean isInputClosed() {
return this.remainingInputGates.isEmpty();
}
@SuppressWarnings("unchecked")
public CoRecordReader(ArrayList<MutableRecordReader<T1>> inputList1,
ArrayList<MutableRecordReader<T2>> inputList2) {
if (inputList1 == null || inputList2 == null) {
throw new IllegalArgumentException("Provided argument recordReaders is null");
}
this.inputGates1 = new HashSet<InputGate<T1>>();
this.inputGates2 = new HashSet<InputGate<T2>>();
this.remainingInputGates = new HashSet<InputGate>(
(int) ((inputGates1.size() + inputGates2.size()) * 1.6f));
for (MutableRecordReader<T1> reader : inputList1) {
InputGate<T1> inputGate = (InputGate<T1>) reader.getInputGate();
inputGate.registerRecordAvailabilityListener(this);
this.inputGates1.add(inputGate);
this.remainingInputGates.add(inputGate);
}
for (MutableRecordReader<T2> reader : inputList2) {
InputGate<T2> inputGate = (InputGate<T2>) reader.getInputGate();
inputGate.registerRecordAvailabilityListener(this);
this.inputGates2.add(inputGate);
this.remainingInputGates.add(inputGate);
}
}
@Override
public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
for (InputGate<T1> gate : this.inputGates1) {
gate.publishEvent(event);
}
for (InputGate<T2> gate : this.inputGates2) {
gate.publishEvent(event);
}
}
@Override
public void reportRecordAvailability(InputGate inputGate) {
synchronized (this.availableInputGates) {
this.availableInputGates.add(inputGate);
this.availableInputGates.notifyAll();
}
}
@SuppressWarnings("unchecked")
protected int getNextRecord(T1 target1, T2 target2) throws IOException, InterruptedException {
int out;
while (true) {
// has the current input gate more data?
if (this.nextInputGateToReadFrom == null) {
if (this.remainingInputGates.isEmpty()) {
return 0;
}
this.nextInputGateToReadFrom = getNextAvailableInputGate();
}
InputChannelResult result;
if (inputGates1.contains(this.nextInputGateToReadFrom)) {
result = this.nextInputGateToReadFrom.readRecord(target1);
out = 1;
} else {
result = this.nextInputGateToReadFrom.readRecord(target2);
out = 2;
}
switch (result) {
case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we
// can stay on the same
// channel
return out;
case LAST_RECORD_FROM_BUFFER: // record is available, but we need to
// re-check the channels
this.nextInputGateToReadFrom = null;
return out;
case END_OF_SUPERSTEP:
this.nextInputGateToReadFrom = null;
if (incrementEndOfSuperstepEventAndCheck()) {
return 0; // end of the superstep
} else {
break; // fall through and wait for next record/event
}
case TASK_EVENT: // event for the subscribers is available
handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
this.nextInputGateToReadFrom = null;
break;
case END_OF_STREAM: // one gate is empty
this.remainingInputGates.remove(this.nextInputGateToReadFrom);
this.nextInputGateToReadFrom = null;
break;
case NONE: // gate processed an internal event and could not return
// a record on this call
this.nextInputGateToReadFrom = null;
break;
}
}
}
private InputGate getNextAvailableInputGate() throws InterruptedException {
synchronized (this.availableInputGates) {
while (this.availableInputGates.isEmpty()) {
this.availableInputGates.wait();
}
return this.availableInputGates.pop();
}
}
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.api.streamcomponent;
package org.apache.flink.streaming.io;
import java.io.IOException;
......
......@@ -53,5 +53,6 @@ public class PrintTest{
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
env.executeTest(MEMORYSIZE);
}
}
......@@ -104,7 +104,7 @@ public class CoGroupReduceTest {
@SuppressWarnings({ "unused", "unchecked" })
DataStream<String> ds4 = env1.fromElements(word1, word2, word3).connect(ds2).groupBy(0, 0)
.reduce(new MyCoReduceFunction(), 0, 0).addSink(new EmptySink());
.reduce(new MyCoReduceFunction()).addSink(new EmptySink());
env1.executeTest(32);
......@@ -142,7 +142,7 @@ public class CoGroupReduceTest {
@SuppressWarnings({ "unused", "unchecked" })
DataStream<String> ds4 = env2.fromElements(word1, word2, word3).connect(ds2).groupBy(2, 0)
.reduce(new MyCoReduceFunction(), 2, 0).addSink(new EmptySink());
.reduce(new MyCoReduceFunction()).addSink(new EmptySink());
env2.executeTest(32);
......
......@@ -39,6 +39,7 @@ public class IncrementalLearningSkeleton {
// Method for pulling new data for prediction
private Integer getNewData() throws InterruptedException {
Thread.sleep(1000);
return 1;
}
}
......@@ -58,6 +59,7 @@ public class IncrementalLearningSkeleton {
// Method for pulling new training data
private Integer getTrainingData() throws InterruptedException {
Thread.sleep(1000);
return 1;
}
......@@ -97,7 +99,7 @@ public class IncrementalLearningSkeleton {
// Update model
partialModel = value;
batchModel = getBatchModel();
return 0;
return 1;
}
// Pulls model built with batch-job on the old training data
......@@ -122,7 +124,7 @@ public class IncrementalLearningSkeleton {
// Build new model on every second of new data
DataStream<Double[]> model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM)
.windowReduce(new PartialModelBuilder(), 1000);
.windowReduce(new PartialModelBuilder(), 5000);
// Use partial model for prediction
DataStream<Integer> prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)
......
......@@ -69,7 +69,7 @@ public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritabl
this.inputGate.publishEvent(event);
}
InputGate<T> getInputGate() {
public InputGate<T> getInputGate() {
return this.inputGate;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册