提交 82c42002 编写于 作者: G Gyula Fora

[FLINK-1434] [FLINK-1401] Streaming support added for webclient

Closes #334
上级 b263932e
......@@ -48,6 +48,12 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.commons.json</artifactId>
<version>2.0.6</version>
</dependency>
</dependencies>
<build>
......
......@@ -17,6 +17,10 @@
package org.apache.flink.streaming.api;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
......@@ -27,6 +31,7 @@ import java.util.Set;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.compiler.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.api.collector.OutputSelector;
......@@ -39,18 +44,22 @@ import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.api.streamvertex.StreamVertex;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.state.OperatorState;
import org.apache.sling.commons.json.JSONArray;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.commons.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Object for building Apache Flink stream processing graphs
*/
public class StreamGraph {
public class StreamGraph extends StreamingPlan {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
protected boolean chaining = true;
private String jobName = DEAFULT_JOB_NAME;
// Graph attributes
private Map<String, Integer> operatorParallelisms;
......@@ -440,7 +449,7 @@ public class StreamGraph {
* Gets the assembled {@link JobGraph} and adds a default name for it.
*/
public JobGraph getJobGraph() {
return getJobGraph(DEAFULT_JOB_NAME);
return getJobGraph(jobName);
}
/**
......@@ -452,11 +461,16 @@ public class StreamGraph {
*/
public JobGraph getJobGraph(String jobGraphName) {
this.jobName = jobGraphName;
StreamingJobGraphGenerator optimizer = new StreamingJobGraphGenerator(this);
return optimizer.createJobGraph(jobGraphName);
}
public void setJobName(String jobName) {
this.jobName = jobName;
}
public void setChaining(boolean chaining) {
this.chaining = chaining;
}
......@@ -525,4 +539,82 @@ public class StreamGraph {
return iterationTimeouts.get(vertexName);
}
public String getOperatorName(String vertexName) {
return operatorNames.get(vertexName);
}
@Override
public String getStreamingPlanAsJSON() {
try {
JSONObject json = new JSONObject();
JSONArray nodes = new JSONArray();
json.put("nodes", nodes);
for (String id : operatorNames.keySet()) {
JSONObject node = new JSONObject();
nodes.put(node);
node.put("id", Integer.valueOf(id));
node.put("type", getOperatorName(id));
if (sources.contains(id)) {
node.put("pact", "Data Source");
} else {
node.put("pact", "Data Stream");
}
node.put("contents", getOperatorName(id) + " at "
+ getInvokable(id).getUserFunction().getClass().getSimpleName());
node.put("parallelism", getParallelism(id));
int numIn = getInEdges(id).size();
if (numIn > 0) {
JSONArray inputs = new JSONArray();
node.put("predecessors", inputs);
for (int i = 0; i < numIn; i++) {
String inID = getInEdges(id).get(i);
JSONObject input = new JSONObject();
inputs.put(input);
input.put("id", Integer.valueOf(inID));
input.put("ship_strategy", getOutPartitioner(inID, id).getStrategy());
if (i == 0) {
input.put("side", "first");
} else if (i == 1) {
input.put("side", "second");
}
}
}
}
return json.toString();
} catch (JSONException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("JSON plan creation failed: {}", e);
}
return "";
}
}
@Override
public void dumpStreamingPlanAsJSON(File file) throws IOException {
PrintWriter pw = null;
try {
pw = new PrintWriter(new FileOutputStream(file), false);
pw.write(getStreamingPlanAsJSON());
pw.flush();
} finally {
if (pw != null) {
pw.close();
}
}
}
}
......@@ -141,13 +141,13 @@ public class StreamingJobGraphGenerator {
}
}
private String createChainedName(String vertexName, List<String> chainedOutputs) {
private String createChainedName(String vertexID, List<String> chainedOutputs) {
String vertexName = streamGraph.getOperatorName(vertexID);
if (chainedOutputs.size() > 1) {
List<String> outputChainedNames = new ArrayList<String>();
for (String chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable));
}
return vertexName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
} else if (chainedOutputs.size() == 1) {
return vertexName + " -> " + chainedNames.get(chainedOutputs.get(0));
......@@ -162,7 +162,10 @@ public class StreamingJobGraphGenerator {
AbstractJobVertex vertex = new AbstractJobVertex(chainedNames.get(vertexName));
vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName));
vertex.setParallelism(streamGraph.getParallelism(vertexName));
if (streamGraph.getParallelism(vertexName) > 0) {
vertex.setParallelism(streamGraph.getParallelism(vertexName));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexName),
vertexName);
......
......@@ -244,7 +244,7 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoMapFunction.class,
coMapper.getClass(), 2, null, null);
return addCoFunction("coMap", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
return addCoFunction("Co-Map", outTypeInfo, new CoMapInvokable<IN1, IN2, OUT>(
clean(coMapper)));
}
......@@ -269,7 +269,7 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
coFlatMapper.getClass(), 2, null, null);
return addCoFunction("coFlatMap", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
return addCoFunction("Co-Flat Map", outTypeInfo, new CoFlatMapInvokable<IN1, IN2, OUT>(
clean(coFlatMapper)));
}
......@@ -294,7 +294,7 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoReduceFunction.class,
coReducer.getClass(), 2, null, null);
return addCoFunction("coReduce", outTypeInfo, getReduceInvokable(clean(coReducer)));
return addCoFunction("Co-Reduce", outTypeInfo, getReduceInvokable(clean(coReducer)));
}
......@@ -361,7 +361,7 @@ public class ConnectedDataStream<IN1, IN2> {
TypeInformation<OUT> outTypeInfo = TypeExtractor.createTypeInfo(CoWindowFunction.class,
coWindowFunction.getClass(), 2, null, null);
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
}
......@@ -390,7 +390,7 @@ public class ConnectedDataStream<IN1, IN2> {
throw new IllegalArgumentException("Slide interval must be positive");
}
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
return addCoFunction("Co-Window", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
}
......
......@@ -95,6 +95,7 @@ public class DataStream<OUT> {
protected static Integer counter = 0;
protected final StreamExecutionEnvironment environment;
protected final String id;
protected final String type;
protected int degreeOfParallelism;
protected List<String> userDefinedNames;
protected StreamPartitioner<OUT> partitioner;
......@@ -122,7 +123,8 @@ public class DataStream<OUT> {
}
counter++;
this.id = operatorType + "-" + counter.toString();
this.id = counter.toString();
this.type = operatorType;
this.environment = environment;
this.degreeOfParallelism = environment.getDegreeOfParallelism();
this.streamGraph = environment.getStreamGraph();
......@@ -142,6 +144,7 @@ public class DataStream<OUT> {
public DataStream(DataStream<OUT> dataStream) {
this.environment = dataStream.environment;
this.id = dataStream.id;
this.type = dataStream.type;
this.degreeOfParallelism = dataStream.degreeOfParallelism;
this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
this.partitioner = dataStream.partitioner;
......@@ -465,7 +468,7 @@ public class DataStream<OUT> {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType());
return transform("map", outType, new MapInvokable<OUT, R>(clean(mapper)));
return transform("Map", outType, new MapInvokable<OUT, R>(clean(mapper)));
}
/**
......@@ -489,7 +492,7 @@ public class DataStream<OUT> {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType());
return transform("flatMap", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));
return transform("Flat Map", outType, new FlatMapInvokable<OUT, R>(clean(flatMapper)));
}
......@@ -506,7 +509,7 @@ public class DataStream<OUT> {
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return transform("reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer)));
return transform("Reduce", getType(), new StreamReduceInvokable<OUT>(clean(reducer)));
}
......@@ -525,7 +528,7 @@ public class DataStream<OUT> {
* @return The filtered DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> filter(FilterFunction<OUT> filter) {
return transform("filter", getType(), new FilterInvokable<OUT>(clean(filter)));
return transform("Filter", getType(), new FilterInvokable<OUT>(clean(filter)));
}
......@@ -834,7 +837,7 @@ public class DataStream<OUT> {
public SingleOutputStreamOperator<Long, ?> count() {
TypeInformation<Long> outTypeInfo = TypeExtractor.getForObject(Long.valueOf(0));
return transform("counter", outTypeInfo, new CounterInvokable<OUT>());
return transform("Count", outTypeInfo, new CounterInvokable<OUT>());
}
/**
......@@ -1090,12 +1093,12 @@ public class DataStream<OUT> {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", getType(), invokable);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("Aggregation", getType(),
invokable);
return returnStream;
}
/**
* Method for passing user defined invokables along with the type
* informations that will transform the DataStream.
......@@ -1179,8 +1182,8 @@ public class DataStream<OUT> {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(),
sinkInvokable);
streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, "sink",
returnStream.getParallelism());
streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null,
"Stream Sink", returnStream.getParallelism());
this.connectGraph(this.copy(), returnStream.getId(), 0);
......
......@@ -66,7 +66,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
*/
@Override
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) {
return transform("groupReduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer),
return transform("Grouped Reduce", getType(), new GroupedReduceInvokable<OUT>(clean(reducer),
keySelector));
}
......@@ -186,7 +186,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
keySelector);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", getType(),
SingleOutputStreamOperator<OUT, ?> returnStream = transform("Grouped Aggregation", getType(),
invokable);
return returnStream;
......
......@@ -83,7 +83,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple1<T0>> outType = (TypeInformation<Tuple1<T0>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType, new ProjectInvokable<IN, Tuple1<T0>>(
return dataStream.transform("Projection", outType, new ProjectInvokable<IN, Tuple1<T0>>(
fieldIndexes, outType));
}
......@@ -111,7 +111,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple2<T0, T1>> outType = (TypeInformation<Tuple2<T0, T1>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple2<T0, T1>>(fieldIndexes, outType));
}
......@@ -141,7 +141,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple3<T0, T1, T2>> outType = (TypeInformation<Tuple3<T0, T1, T2>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple3<T0, T1, T2>>(fieldIndexes, outType));
}
......@@ -173,7 +173,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple4<T0, T1, T2, T3>> outType = (TypeInformation<Tuple4<T0, T1, T2, T3>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple4<T0, T1, T2, T3>>(fieldIndexes, outType));
}
......@@ -206,7 +206,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple5<T0, T1, T2, T3, T4>> outType = (TypeInformation<Tuple5<T0, T1, T2, T3, T4>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple5<T0, T1, T2, T3, T4>>(fieldIndexes, outType));
}
......@@ -243,7 +243,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>> outType = (TypeInformation<Tuple6<T0, T1, T2, T3, T4, T5>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple6<T0, T1, T2, T3, T4, T5>>(fieldIndexes, outType));
}
......@@ -283,7 +283,7 @@ public class StreamProjection<IN> {
TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>> outType = (TypeInformation<Tuple7<T0, T1, T2, T3, T4, T5, T6>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream
.transform("projection", outType,
.transform("Projection", outType,
new ProjectInvokable<IN, Tuple7<T0, T1, T2, T3, T4, T5, T6>>(fieldIndexes,
outType));
}
......@@ -325,7 +325,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> outType = (TypeInformation<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>>(fieldIndexes,
outType));
}
......@@ -369,7 +369,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> outType = (TypeInformation<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>>(fieldIndexes,
outType));
}
......@@ -415,7 +415,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> outType = (TypeInformation<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>>(
fieldIndexes, outType));
}
......@@ -465,7 +465,7 @@ public class StreamProjection<IN> {
@SuppressWarnings("unchecked")
TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> outType = (TypeInformation<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>) extractFieldTypes(
fieldIndexes, types, inTypeInfo);
return dataStream.transform("projection", outType,
return dataStream.transform("Projection", outType,
new ProjectInvokable<IN, Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>>(
fieldIndexes, outType));
}
......@@ -519,7 +519,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>>(
fieldIndexes, outType));
......@@ -576,7 +576,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>>(
fieldIndexes, outType));
......@@ -635,7 +635,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>>(
fieldIndexes, outType));
......@@ -697,7 +697,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>>(
fieldIndexes, outType));
......@@ -761,7 +761,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>>(
fieldIndexes, outType));
......@@ -827,7 +827,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>>(
fieldIndexes, outType));
......@@ -895,7 +895,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>>(
fieldIndexes, outType));
......@@ -966,7 +966,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>>(
fieldIndexes, outType));
......@@ -1039,7 +1039,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>>(
fieldIndexes, outType));
......@@ -1115,7 +1115,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>>(
fieldIndexes, outType));
......@@ -1193,7 +1193,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>>(
fieldIndexes, outType));
......@@ -1274,7 +1274,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>>(
fieldIndexes, outType));
......@@ -1357,7 +1357,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>>(
fieldIndexes, outType));
......@@ -1442,7 +1442,7 @@ public class StreamProjection<IN> {
fieldIndexes, types, inTypeInfo);
return dataStream
.transform(
"projection",
"Projection",
outType,
new ProjectInvokable<IN, Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>>(
......
......@@ -231,7 +231,7 @@ public class WindowedDataStream<OUT> {
* @return The transformed DataStream
*/
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
return dataStream.transform("WindowReduce", getType(),
return dataStream.transform("Window-Reduce", getType(),
getReduceInvokable(reduceFunction));
}
......@@ -279,7 +279,7 @@ public class WindowedDataStream<OUT> {
public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) {
return dataStream.transform("WindowReduce", outType,
return dataStream.transform("Window-Reduce", outType,
getReduceGroupInvokable(reduceFunction));
}
......@@ -507,7 +507,7 @@ public class WindowedDataStream<OUT> {
private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("windowReduce",
SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("Window-Aggregation",
getType(), invokable);
return returnStream;
......
......@@ -32,7 +32,9 @@ import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.StreamGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -267,7 +269,7 @@ public abstract class StreamExecutionEnvironment {
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
return addSource(function, outTypeInfo, "elements");
return addSource(function, outTypeInfo, "Elements source");
}
/**
......@@ -294,7 +296,7 @@ public abstract class StreamExecutionEnvironment {
TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
return addSource(function, outTypeInfo, "collection");
return addSource(function, outTypeInfo, "Collection Source");
}
/**
......@@ -313,7 +315,7 @@ public abstract class StreamExecutionEnvironment {
*/
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null,
"socketStream");
"Socket Stream");
}
/**
......@@ -345,13 +347,13 @@ public abstract class StreamExecutionEnvironment {
if (from > to) {
throw new IllegalArgumentException("Start of sequence must not be greater than the end");
}
return addSource(new GenSequenceFunction(from, to), null, "sequence");
return addSource(new GenSequenceFunction(from, to), null, "Sequence Source");
}
private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
TypeInformation<String> typeInfo) {
FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
DataStreamSource<String> returnStream = addSource(function, null, "fileSource");
DataStreamSource<String> returnStream = addSource(function, null, "File Source");
streamGraph.setInputFormat(returnStream.getId(), inputFormat);
return returnStream;
}
......@@ -397,7 +399,7 @@ public abstract class StreamExecutionEnvironment {
*/
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
TypeInformation<OUT> outTypeInfo) {
return addSource(function, outTypeInfo, function.getClass().getName());
return addSource(function, outTypeInfo, "Custom Source");
}
/**
......@@ -462,6 +464,8 @@ public abstract class StreamExecutionEnvironment {
ContextEnvironment ctx = (ContextEnvironment) env;
return createContextEnvironment(ctx.getClient(), ctx.getJars(),
ctx.getDegreeOfParallelism());
} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
......@@ -592,4 +596,16 @@ public abstract class StreamExecutionEnvironment {
return streamGraph;
}
/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*
* @return The execution plan of the program, as a JSON String.
*/
public String getExecutionPlan() {
return getStreamGraph().getStreamingPlanAsJSON();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.environment;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
public class StreamPlanEnvironment extends StreamExecutionEnvironment {
private ExecutionEnvironment env;
protected StreamPlanEnvironment(ExecutionEnvironment env) {
super();
this.env = env;
int dop = env.getDegreeOfParallelism();
if (dop > 0) {
setDegreeOfParallelism(dop);
} else {
setDegreeOfParallelism(GlobalConfiguration.getInteger(
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
}
}
@Override
public void execute() throws Exception {
execute("");
}
@Override
public void execute(String jobName) throws Exception {
streamGraph.setJobName(jobName);
if (env instanceof OptimizerPlanEnvironment) {
((OptimizerPlanEnvironment) env).setPlan(streamGraph);
} else if (env instanceof PreviewPlanEnvironment) {
((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
}
throw new Client.ProgramAbortException();
}
}
......@@ -179,4 +179,8 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
public static enum ChainingStrategy {
ALWAYS, NEVER, HEAD;
}
public Function getUserFunction() {
return userFunction;
}
}
......@@ -35,7 +35,7 @@ public class FieldsPartitioner<T> extends StreamPartitioner<T> {
KeySelector<T, ?> keySelector;
public FieldsPartitioner(KeySelector<T, ?> keySelector) {
super(PartitioningStrategy.FIELDS);
super(PartitioningStrategy.GROUPBY);
this.keySelector = keySelector;
}
......
......@@ -27,7 +27,7 @@ public abstract class StreamPartitioner<T> implements
public enum PartitioningStrategy {
FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, FIELDS;
FORWARD, DISTRIBUTE, SHUFFLE, BROADCAST, GLOBAL, GROUPBY;
}
......
......@@ -203,6 +203,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def execute(jobName: String) = javaEnv.execute(jobName)
/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*
*/
def getExecutionPlan() = javaEnv.getStreamGraph().getStreamingPlanAsJSON();
}
object StreamExecutionEnvironment {
......
......@@ -103,7 +103,7 @@ public class RemoteExecutor extends PlanExecutor {
JobWithJars p = new JobWithJars(plan, this.jarFiles);
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
OptimizedPlan op = c.getOptimizedPlan(p, -1);
OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON(op);
}
......
......@@ -26,20 +26,18 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.contextcheck.ContextChecker;
import org.apache.flink.compiler.costs.DefaultCostEstimator;
import org.apache.flink.compiler.plan.FlinkPlan;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.StreamingPlan;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.apache.flink.configuration.ConfigConstants;
......@@ -49,13 +47,17 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionFailure;
import org.apache.flink.runtime.messages.JobManagerMessages.SubmissionResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.google.common.base.Preconditions;
/**
* Encapsulates the functionality necessary to submit a program to a remote cluster.
......@@ -133,10 +135,10 @@ public class Client {
public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON(getOptimizedPlan(prog, parallelism));
return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan) getOptimizedPlan(prog, parallelism));
}
public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
......@@ -189,7 +191,7 @@ public class Client {
}
}
public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
p.setDefaultParallelism(parallelism);
}
......@@ -208,25 +210,31 @@ public class Client {
* @throws CompilerException Thrown, if the compiler encounters an illegal situation.
* @throws ProgramInvocationException Thrown, if the program could not be instantiated from its jar file.
*/
public OptimizedPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
return getOptimizedPlan(prog.getPlan(), parallelism);
}
public JobGraph getJobGraph(PackagedProgram prog, OptimizedPlan optPlan) throws ProgramInvocationException {
public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
return getJobGraph(optPlan, prog.getAllLibraries());
}
private JobGraph getJobGraph(OptimizedPlan optPlan, List<File> jarFiles) {
NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
JobGraph job = gen.compileJobGraph(optPlan);
private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
JobGraph job = null;
if (optPlan instanceof StreamingPlan) {
job = ((StreamingPlan) optPlan).getJobGraph();
} else {
NepheleJobGraphGenerator gen = new NepheleJobGraphGenerator();
job = gen.compileJobGraph((OptimizedPlan) optPlan);
}
for (File jar : jarFiles) {
job.addJar(new Path(jar.getAbsolutePath()));
}
return job;
}
public JobExecutionResult run(final PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
if (prog.isUsingProgramEntryPoint()) {
......@@ -287,7 +295,7 @@ public class Client {
* on the nephele system failed.
*/
public JobExecutionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
return run(getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
return run((OptimizedPlan) getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
}
......@@ -346,11 +354,11 @@ public class Client {
// --------------------------------------------------------------------------------------------
private static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
private final PactCompiler compiler;
private OptimizedPlan optimizerPlan;
private FlinkPlan optimizerPlan;
private OptimizerPlanEnvironment(PactCompiler compiler) {
......@@ -385,6 +393,10 @@ public class Client {
};
initializeContextEnvironment(factory);
}
public void setPlan(FlinkPlan plan){
this.optimizerPlan = plan;
}
}
public static final class ProgramAbortException extends Error {
......
......@@ -59,7 +59,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
public String getExecutionPlan() throws Exception {
Plan p = createProgramPlan("unnamed job");
OptimizedPlan op = this.client.getOptimizedPlan(p, getDegreeOfParallelism());
OptimizedPlan op = (OptimizedPlan) this.client.getOptimizedPlan(p, getDegreeOfParallelism());
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
......
......@@ -44,12 +44,12 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.dag.DataSinkNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.util.InstantiationUtil;
/**
* This class encapsulates represents a program, packaged in a jar file. It supplies
......@@ -271,7 +271,9 @@ public class PackagedProgram {
catch (Throwable t) {
// the invocation gets aborted with the preview plan
if (env.previewPlan != null) {
previewPlan = env.previewPlan;
previewPlan = env.previewPlan;
} else if (env.preview != null) {
return env.preview;
} else {
throw new ProgramInvocationException("The program caused an error: ", t);
}
......@@ -290,7 +292,7 @@ public class PackagedProgram {
else {
throw new RuntimeException();
}
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
StringWriter string = new StringWriter(1024);
PrintWriter pw = null;
......@@ -301,6 +303,7 @@ public class PackagedProgram {
pw.close();
}
return string.toString();
}
/**
......@@ -690,10 +693,12 @@ public class PackagedProgram {
private List<DataSinkNode> previewPlan;
private Plan plan;
private String preview = null;
@Override
public JobExecutionResult execute(String jobName) throws Exception {
this.plan = createProgramPlan(jobName);
this.previewPlan = PactCompiler.createPreOptimizedPlan(plan);
this.previewPlan = PactCompiler.createPreOptimizedPlan((Plan) plan);
// do not go on with anything now!
throw new Client.ProgramAbortException();
......@@ -721,5 +726,10 @@ public class PackagedProgram {
public Plan getPlan() {
return this.plan;
}
public void setPreview(String preview) {
this.preview = preview;
}
}
}
......@@ -40,7 +40,9 @@ import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.plan.FlinkPlan;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.StreamingPlan;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
......@@ -166,7 +168,7 @@ public class JobSubmissionServlet extends HttpServlet {
// create the plan
String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]);
PackagedProgram program;
OptimizedPlan optPlan;
FlinkPlan optPlan;
Client client;
try {
......@@ -246,14 +248,18 @@ public class JobSubmissionServlet extends HttpServlet {
String planName = uid + ".json";
File jsonFile = new File(this.planDumpDirectory, planName);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
jsonGen.setEncodeForHTML(true);
jsonGen.dumpOptimizerPlanAsJSON(optPlan, jsonFile);
if (optPlan instanceof StreamingPlan) {
((StreamingPlan) optPlan).dumpStreamingPlanAsJSON(jsonFile);
} else {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
jsonGen.setEncodeForHTML(true);
jsonGen.dumpOptimizerPlanAsJSON((OptimizedPlan) optPlan, jsonFile);
}
// submit the job only, if it should not be suspended
if (!suspend) {
try {
client.run(program, optPlan, false);
client.run(program,(OptimizedPlan) optPlan, false);
} catch (Throwable t) {
LOG.error("Error submitting job to the job-manager.", t);
showErrorPage(resp, t.getMessage());
......
......@@ -45,7 +45,7 @@ public class ExecutionPlanCreationTest {
InetSocketAddress mockJmAddress = new InetSocketAddress(mockAddress, 12345);
Client client = new Client(mockJmAddress, new Configuration(), getClass().getClassLoader());
OptimizedPlan op = client.getOptimizedPlan(prg, -1);
OptimizedPlan op = (OptimizedPlan) client.getOptimizedPlan(prg, -1);
assertNotNull(op);
PlanJSONDumpGenerator dumper = new PlanJSONDumpGenerator();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.compiler.plan;
/**
* A common interface for compiled Flink plans for both batch and streaming
* processing programs.
*
*/
public interface FlinkPlan {
}
......@@ -30,7 +30,7 @@ import org.apache.flink.util.Visitor;
* It works on this representation during its optimization. Finally, this plan is translated to a schedule
* for the nephele runtime system.
*/
public class OptimizedPlan implements Visitable<PlanNode> {
public class OptimizedPlan implements FlinkPlan, Visitable<PlanNode> {
/**
* The data sources in the plan.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.compiler.plan;
import java.io.File;
import java.io.IOException;
import org.apache.flink.runtime.jobgraph.JobGraph;
/**
* Abstract class representing Flink Streaming plans
*
*/
public abstract class StreamingPlan implements FlinkPlan {
public abstract JobGraph getJobGraph();
public abstract String getStreamingPlanAsJSON();
public abstract void dumpStreamingPlanAsJSON(File file) throws IOException;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册