提交 5d72dd49 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] Iteration update to use proper partitioning and forward connection...

[streaming] Iteration update to use proper partitioning and forward connection + DataStream refactor and Javadoc update
上级 859983cc
......@@ -82,23 +82,29 @@ public class DataStream<T extends Tuple> {
counter++;
this.id = operatorType + "-" + counter.toString();
this.environment = environment;
this.degreeOfParallelism = environment.getDegreeOfParallelism();
initConnections();
}
/**
* Create a new {@link DataStream} in the given environment with the given
* id
* Create a new DataStream by creating a copy of another DataStream
*
* @param environment
* StreamExecutionEnvironment
* @param id
* The id of the DataStream
* @param dataStream
* The DataStream that will be copied.
*/
protected DataStream(StreamExecutionEnvironment environment, String operatorType, String id) {
this.environment = environment;
this.id = id;
initConnections();
protected DataStream(DataStream<T> dataStream) {
this.environment = dataStream.environment;
this.type = dataStream.type;
this.id = dataStream.id;
this.degreeOfParallelism = dataStream.degreeOfParallelism;
this.userDefinedName = dataStream.userDefinedName;
this.outputSelector = dataStream.outputSelector;
this.connectIDs = new ArrayList<String>(dataStream.connectIDs);
this.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(dataStream.ctypes);
this.cparams = new ArrayList<Integer>(dataStream.cparams);
this.iterationflag = dataStream.iterationflag;
this.iterationID = dataStream.iterationID;
}
/**
......@@ -115,25 +121,6 @@ public class DataStream<T extends Tuple> {
}
/**
* Creates an identical {@link DataStream}.
*
* @return The DataStream copy.
*/
public DataStream<T> copy() {
DataStream<T> copiedStream = new DataStream<T>(environment, "", getId());
copiedStream.type = this.type;
copiedStream.connectIDs = new ArrayList<String>(this.connectIDs);
copiedStream.userDefinedName = this.userDefinedName;
copiedStream.ctypes = new ArrayList<StreamExecutionEnvironment.ConnectionType>(this.ctypes);
copiedStream.cparams = new ArrayList<Integer>(this.cparams);
copiedStream.degreeOfParallelism = this.degreeOfParallelism;
copiedStream.iterationflag = this.iterationflag;
copiedStream.iterationID = this.iterationID;
return copiedStream;
}
/**
* Returns the ID of the {@link DataStream}.
*
......@@ -159,7 +146,7 @@ public class DataStream<T extends Tuple> {
environment.setOperatorParallelism(this);
return this.copy();
return new DataStream<T>(this);
}
......@@ -173,8 +160,9 @@ public class DataStream<T extends Tuple> {
}
/**
* Gives the data transformation a user defined name in order to use at
* directed outputs
* Gives the data transformation(vertex) a user defined name in order to use
* at directed outputs. The {@link OutputSelector} of the input vertex
* should use this name for directed emits.
*
* @param name
* The name to set
......@@ -202,7 +190,7 @@ public class DataStream<T extends Tuple> {
* @return The connected DataStream.
*/
public DataStream<T> connectWith(DataStream<T>... streams) {
DataStream<T> returnStream = copy();
DataStream<T> returnStream = new DataStream<T>(this);
for (DataStream<T> stream : streams) {
addConnection(returnStream, stream);
......@@ -218,6 +206,16 @@ public class DataStream<T extends Tuple> {
return returnStream;
}
/**
* Operator used for directing tuples to specific named outputs. Sets an
* {@link OutputSelector} for the vertex. The tuples emitted from this
* vertex will be sent to the output names selected by the OutputSelector.
* Unnamed outputs will not receive any tuples.
*
* @param outputSelector
* The user defined OutputSelector for directing the tuples.
* @return The directed DataStream.
*/
public DataStream<T> directTo(OutputSelector<T> outputSelector) {
this.outputSelector = outputSelector;
environment.addDirectedEmit(id, outputSelector);
......@@ -242,7 +240,7 @@ public class DataStream<T extends Tuple> {
// "The position of the field must be smaller than the number of fields in the Tuple");
// }
DataStream<T> returnStream = copy();
DataStream<T> returnStream = new DataStream<T>(this);
for (int i = 0; i < returnStream.ctypes.size(); i++) {
returnStream.ctypes.set(i, ConnectionType.FIELD);
......@@ -258,14 +256,14 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with broadcast partitioning set.
*/
public DataStream<T> broadcast() {
DataStream<T> returnStream = copy();
DataStream<T> returnStream = new DataStream<T>(this);
for (int i = 0; i < returnStream.ctypes.size(); i++) {
returnStream.ctypes.set(i, ConnectionType.BROADCAST);
}
return returnStream;
}
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are shuffled to the next component.
......@@ -273,14 +271,14 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> shuffle() {
DataStream<T> returnStream = copy();
DataStream<T> returnStream = new DataStream<T>(this);
for (int i = 0; i < returnStream.ctypes.size(); i++) {
returnStream.ctypes.set(i, ConnectionType.SHUFFLE);
}
return returnStream;
}
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are forwarded to the local subtask of the next component.
......@@ -288,7 +286,7 @@ public class DataStream<T extends Tuple> {
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<T> forward() {
DataStream<T> returnStream = copy();
DataStream<T> returnStream = new DataStream<T>(this);
for (int i = 0; i < returnStream.ctypes.size(); i++) {
returnStream.ctypes.set(i, ConnectionType.FORWARD);
......@@ -309,7 +307,8 @@ public class DataStream<T extends Tuple> {
* @return The transformed DataStream.
*/
public <R extends Tuple> StreamOperator<T, R> map(MapFunction<T, R> mapper) {
return environment.addFunction("map", this.copy(), mapper, new MapInvokable<T, R>(mapper));
return environment.addFunction("map", new DataStream<T>(this), mapper,
new MapInvokable<T, R>(mapper));
}
......@@ -328,7 +327,7 @@ public class DataStream<T extends Tuple> {
* @return The transformed DataStream.
*/
public <R extends Tuple> StreamOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
return environment.addFunction("flatMap", this.copy(), flatMapper,
return environment.addFunction("flatMap", new DataStream<T>(this), flatMapper,
new FlatMapInvokable<T, R>(flatMapper));
}
......@@ -344,7 +343,7 @@ public class DataStream<T extends Tuple> {
* @return The filtered DataStream.
*/
public StreamOperator<T, T> filter(FilterFunction<T> filter) {
return environment.addFunction("filter", this.copy(), filter,
return environment.addFunction("filter", new DataStream<T>(this), filter,
new FilterInvokable<T>(filter));
}
......@@ -365,7 +364,7 @@ public class DataStream<T extends Tuple> {
*/
public <R extends Tuple> StreamOperator<T, R> batchReduce(GroupReduceFunction<T, R> reducer,
int batchSize) {
return environment.addFunction("batchReduce", this.copy(), reducer,
return environment.addFunction("batchReduce", new DataStream<T>(this), reducer,
new BatchReduceInvokable<T, R>(reducer, batchSize));
}
......@@ -379,7 +378,7 @@ public class DataStream<T extends Tuple> {
* @return The modified DataStream.
*/
public DataStream<T> addSink(SinkFunction<T> sinkFunction) {
return environment.addSink(this.copy(), sinkFunction);
return environment.addSink(new DataStream<T>(this), sinkFunction);
}
/**
......@@ -390,7 +389,7 @@ public class DataStream<T extends Tuple> {
* @return The closed DataStream.
*/
public DataStream<T> print() {
return environment.print(this.copy());
return environment.print(new DataStream<T>(this));
}
/**
......@@ -408,12 +407,12 @@ public class DataStream<T extends Tuple> {
* @return The iterative data stream created.
*/
public IterativeDataStream<T> iterate() {
return new IterativeDataStream<T>(copy());
return new IterativeDataStream<T>(this);
}
protected DataStream<T> addIterationSource(String iterationID) {
environment.addIterationSource(this, iterationID);
return this.copy();
return new DataStream<T>(this);
}
/**
......
......@@ -32,7 +32,7 @@ public class IterativeDataStream<T extends Tuple> extends StreamOperator<T, T> {
static Integer iterationCount = 0;
protected IterativeDataStream(DataStream<T> dataStream) {
super(dataStream.environment, "iteration", dataStream.id);
super(dataStream);
iterationID = iterationCount;
iterationCount++;
iterationflag = true;
......
......@@ -70,7 +70,8 @@ public class JobGraphBuilder {
// Graph attributes
private Map<String, AbstractJobVertex> components;
private Map<String, Integer> componentParallelism;
private Map<String, List<String>> edgeList;
private Map<String, List<String>> outEdgeList;
private Map<String, List<String>> inEdgeList;
private Map<String, List<StreamPartitioner<? extends Tuple>>> connectionTypes;
private Map<String, String> userDefinedNames;
private Map<String, String> operatorNames;
......@@ -79,6 +80,7 @@ public class JobGraphBuilder {
private Map<String, byte[]> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>> componentClasses;
private Map<String, String> iterationIds;
private Map<String, String> iterationHeadNames;
private String maxParallelismVertexName;
private int maxParallelism;
......@@ -97,7 +99,8 @@ public class JobGraphBuilder {
components = new HashMap<String, AbstractJobVertex>();
componentParallelism = new HashMap<String, Integer>();
edgeList = new HashMap<String, List<String>>();
outEdgeList = new HashMap<String, List<String>>();
inEdgeList = new HashMap<String, List<String>>();
connectionTypes = new HashMap<String, List<StreamPartitioner<? extends Tuple>>>();
userDefinedNames = new HashMap<String, String>();
operatorNames = new HashMap<String, String>();
......@@ -106,6 +109,7 @@ public class JobGraphBuilder {
outputSelectors = new HashMap<String, byte[]>();
componentClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
iterationIds = new HashMap<String, String>();
iterationHeadNames = new HashMap<String, String>();
maxParallelismVertexName = "";
maxParallelism = 0;
......@@ -158,9 +162,13 @@ public class JobGraphBuilder {
addComponent(componentName, StreamIterationSource.class, null, null, null, parallelism);
iterationIds.put(componentName, iterationID);
iterationHeadNames.put(iterationID, componentName);
setBytesFrom(iterationHead, componentName);
setEdge(componentName, iterationHead,
connectionTypes.get(inEdgeList.get(iterationHead).get(0)).get(0));
if (LOG.isDebugEnabled()) {
LOG.debug("Iteration head source: " + componentName);
}
......@@ -275,7 +283,8 @@ public class JobGraphBuilder {
invokableObjects.put(componentName, invokableObject);
operatorNames.put(componentName, operatorName);
serializedFunctions.put(componentName, serializedFunction);
edgeList.put(componentName, new ArrayList<String>());
outEdgeList.put(componentName, new ArrayList<String>());
inEdgeList.put(componentName, new ArrayList<String>());
connectionTypes.put(componentName, new ArrayList<StreamPartitioner<? extends Tuple>>());
}
......@@ -403,6 +412,10 @@ public class JobGraphBuilder {
maxParallelism = parallelism;
maxParallelismVertexName = componentName;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Parallelism set: " + parallelism + " for " + componentName);
}
}
/**
......@@ -418,7 +431,8 @@ public class JobGraphBuilder {
*/
public void setEdge(String upStreamComponentName, String downStreamComponentName,
StreamPartitioner<? extends Tuple> partitionerObject) {
edgeList.get(upStreamComponentName).add(downStreamComponentName);
outEdgeList.get(upStreamComponentName).add(downStreamComponentName);
inEdgeList.get(downStreamComponentName).add(upStreamComponentName);
connectionTypes.get(upStreamComponentName).add(partitionerObject);
}
......@@ -439,7 +453,6 @@ public class JobGraphBuilder {
public <T extends Tuple> void broadcastConnect(DataStream<T> inputStream,
String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, new BroadcastPartitioner<T>());
LOG.info("Broadcastconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -464,9 +477,6 @@ public class JobGraphBuilder {
setEdge(upStreamComponentName, downStreamComponentName, new FieldsPartitioner<T>(
keyPosition));
LOG.info("Fieldsconnected: " + upStreamComponentName + " to " + downStreamComponentName
+ "by" + keyPosition);
}
/**
......@@ -485,8 +495,6 @@ public class JobGraphBuilder {
public <T extends Tuple> void globalConnect(DataStream<T> inputStream,
String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, new GlobalPartitioner<T>());
LOG.info("Globalconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -505,7 +513,6 @@ public class JobGraphBuilder {
public <T extends Tuple> void shuffleConnect(DataStream<T> inputStream,
String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, new ShufflePartitioner<T>());
LOG.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -525,7 +532,6 @@ public class JobGraphBuilder {
public <T extends Tuple> void forwardConnect(DataStream<T> inputStream,
String upStreamComponentName, String downStreamComponentName) {
setEdge(upStreamComponentName, downStreamComponentName, new ForwardPartitioner<T>());
LOG.info("Shuffleconnected: " + upStreamComponentName + " to " + downStreamComponentName);
}
/**
......@@ -599,6 +605,20 @@ public class JobGraphBuilder {
}
}
/**
* Sets the parallelism of the iteration head of the given iteration id to
* the parallelism given.
*
* @param iterationID
* ID of the iteration
* @param parallelism
* Parallelism to set, typically the parallelism of the iteration
* tail.
*/
public void setIterationSourceParallelism(String iterationID, int parallelism) {
setParallelism(iterationHeadNames.get(iterationID), parallelism);
}
/**
* Sets a user defined {@link OutputSelector} for the given component. Used
* for directed emits.
......@@ -687,13 +707,13 @@ public class JobGraphBuilder {
*/
private void buildGraph() {
for (String componentName : edgeList.keySet()) {
for (String componentName : outEdgeList.keySet()) {
createVertex(componentName);
}
for (String upStreamComponentName : edgeList.keySet()) {
for (String upStreamComponentName : outEdgeList.keySet()) {
int i = 0;
for (String downStreamComponentName : edgeList.get(upStreamComponentName)) {
for (String downStreamComponentName : outEdgeList.get(upStreamComponentName)) {
connect(upStreamComponentName, downStreamComponentName,
connectionTypes.get(upStreamComponentName).get(i));
i++;
......
......@@ -190,7 +190,7 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
"elements", SerializationUtils.serialize(data[0]), 1);
return returnStream.copy();
return returnStream;
}
/**
......@@ -211,7 +211,7 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.addSource(returnStream.getId(), new FromElementsFunction<X>(data),
"elements", SerializationUtils.serialize((Serializable) data.toArray()[0]), 1);
return returnStream.copy();
return returnStream;
}
/**
......@@ -245,7 +245,7 @@ public abstract class StreamExecutionEnvironment {
jobGraphBuilder.addSource(returnStream.getId(), sourceFunction, "source",
SerializationUtils.serialize(sourceFunction), parallelism);
return returnStream.copy();
return returnStream;
}
public <T extends Tuple> DataStream<T> addSource(SourceFunction<T> sourceFunction) {
......@@ -294,24 +294,25 @@ public abstract class StreamExecutionEnvironment {
protected <T extends Tuple, R extends Tuple> void addIterationSource(DataStream<T> inputStream,
String iterationID) {
DataStream<R> returnStream = new DataStream<R>(this, "iterationHead");
DataStream<R> returnStream = new DataStream<R>(this, "iterationSource");
jobGraphBuilder.addIterationSource(returnStream.getId(), inputStream.getId(), iterationID,
degreeOfParallelism);
jobGraphBuilder.shuffleConnect(inputStream, returnStream.getId(), inputStream.getId());
}
protected <T extends Tuple, R extends Tuple> void addIterationSink(DataStream<T> inputStream,
String iterationID) {
DataStream<R> returnStream = new DataStream<R>(this, "iterationTail");
DataStream<R> returnStream = new DataStream<R>(this, "iterationSink");
jobGraphBuilder.addIterationSink(returnStream.getId(), inputStream.getId(), iterationID,
degreeOfParallelism, "iterate");
inputStream.getParallelism(), "iterate");
jobGraphBuilder.setIterationSourceParallelism(iterationID, inputStream.getParallelism());
for (int i = 0; i < inputStream.connectIDs.size(); i++) {
String input = inputStream.connectIDs.get(i);
jobGraphBuilder.shuffleConnect(inputStream, input, returnStream.getId());
jobGraphBuilder.forwardConnect(inputStream, input, returnStream.getId());
}
}
......
......@@ -27,8 +27,8 @@ public class StreamOperator<IN extends Tuple, OUT extends Tuple> extends DataStr
super(environment, operatorType);
}
protected StreamOperator(StreamExecutionEnvironment environment, String operatorType, String id) {
super(environment, operatorType, id);
protected StreamOperator(DataStream<OUT> dataStream) {
super(dataStream);
}
}
......@@ -25,20 +25,38 @@ import java.util.Collection;
import org.apache.flink.api.java.tuple.Tuple;
/**
* Class for defining an OutputSelector for the directTo operator. Every output
* tuple of a directed DataStream will run through this operator to select
* outputs.
*
* @param <T>
* Type parameter of the directed tuples.
*/
public abstract class OutputSelector<T extends Tuple> implements Serializable {
private static final long serialVersionUID = 1L;
private Collection<String> outputs;
public OutputSelector() {
outputs = new ArrayList<String>();
}
Collection<String> getOutputs(T tuple) {
outputs.clear();
select(tuple, outputs);
return outputs;
}
/**
* Method for selecting output names for the emitted tuples when using the
* directTo operator. The tuple will be emitted only to output names which
* are added to the outputs collection.
*
* @param tuple
* Tuple for which the output selection should be made.
* @param outputs
* Selected output names should be added to this collection.
*/
public abstract void select(T tuple, Collection<String> outputs);
}
\ No newline at end of file
......@@ -50,7 +50,7 @@ public class StreamIterationSink<IN extends Tuple> extends AbstractStreamCompone
setSinkSerializer();
inputs = getConfigInputs();
iterationId = configuration.getString("iteration-id", "iteration-0");
dataChannel = BlockingQueueBroker.instance().getAndRemove(iterationId);
dataChannel = BlockingQueueBroker.instance().get(iterationId);
} catch (Exception e) {
throw new StreamComponentException(String.format(
"Cannot register inputs of StreamIterationSink %s", iterationId), e);
......
......@@ -68,8 +68,11 @@ public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComp
}
iterationId = configuration.getString("iteration-id", "iteration-0");
BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
try {
BlockingQueueBroker.instance().handIn(iterationId, dataChannel);
} catch (Exception e) {
}
}
@Override
......@@ -84,9 +87,8 @@ public class StreamIterationSource<OUT extends Tuple> extends AbstractStreamComp
while (true) {
@SuppressWarnings("unchecked")
StreamRecord<OUT> nextRecord = dataChannel.poll(3, TimeUnit.SECONDS);
if(nextRecord == null){
if (nextRecord == null) {
break;
}
nextRecord.setSeralizationDelegate(this.outSerializationDelegate);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册