diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
index f358de9b2dbe89996d12429301e3f3e37e1269e4..c8262746212ad7423cac2fc5ef8b3f59fd435e58 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/JobGraphBuilder.java
@@ -505,6 +505,15 @@ public class JobGraphBuilder {
}
+ public void setInvokable(String id, StreamInvokable invokableObject) {
+ invokableObjects.put(id, invokableObject);
+ }
+
+ public void setOutType(String id, TypeInformation outType) {
+ StreamRecordSerializer serializer = new StreamRecordSerializer(outType);
+ typeSerializersOut1.put(id, serializer);
+ }
+
/**
* Sets TypeSerializerWrapper from one vertex to another, used with some
* sinks.
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 65a6c37b71c829ed4261f929584632e79c9ac7de..39b6460f01bdf32669179dabf204c192749a8e32 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -539,7 +539,7 @@ public class ConnectedDataStream {
return invokable;
}
- protected SingleOutputStreamOperator addGeneralWindowCombine(
+ public SingleOutputStreamOperator addGeneralWindowCombine(
CoWindowFunction coWindowFunction, TypeInformation outTypeInfo,
long windowSize, long slideInterval, TimeStamp timestamp1,
TimeStamp timestamp2) {
@@ -550,7 +550,7 @@ public class ConnectedDataStream {
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
-
+
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index a23631245df7c3e9a90d1d3f3b4e1977ef4da3eb..2a0b67376e12223be17387c9f48c171f109a7684 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -92,7 +92,8 @@ public class DataStream {
protected List userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner partitioner;
- protected final TypeInformation typeInfo;
+ @SuppressWarnings("rawtypes")
+ protected TypeInformation typeInfo;
protected List> mergedStreams;
protected final JobGraphBuilder jobGraphBuilder;
@@ -175,10 +176,18 @@ public class DataStream {
*
* @return The type of the datastream.
*/
+ @SuppressWarnings("unchecked")
public TypeInformation getType() {
return this.typeInfo;
}
+ @SuppressWarnings("unchecked")
+ public DataStream setType(TypeInformation outType) {
+ jobGraphBuilder.setOutType(id, outType);
+ typeInfo = outType;
+ return (DataStream) this;
+ }
+
public F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
@@ -979,7 +988,7 @@ public class DataStream {
StreamReduceInvokable invokable = new StreamReduceInvokable(aggregate);
- SingleOutputStreamOperator returnStream = transform("reduce", typeInfo, invokable);
+ SingleOutputStreamOperator returnStream = transform("reduce", getType(), invokable);
return returnStream;
}
@@ -1077,7 +1086,7 @@ public class DataStream {
*/
public DataStreamSink addSink(SinkFunction sinkFunction) {
- DataStreamSink returnStream = new DataStreamSink(environment, "sink", typeInfo);
+ DataStreamSink returnStream = new DataStreamSink(environment, "sink", getType());
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable(
clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism);
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
index 18b4b75392cb3b94ee8ed9991af2110dfd456739..160ef8d311eee9a9a45383576048869589bbd64c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java
@@ -186,7 +186,7 @@ public class GroupedDataStream extends DataStream {
GroupedReduceInvokable invokable = new GroupedReduceInvokable(clean(aggregate),
keySelector);
- SingleOutputStreamOperator returnStream = transform("groupReduce", typeInfo,
+ SingleOutputStreamOperator returnStream = transform("groupReduce", getType(),
invokable);
return returnStream;
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 016322bf4f236ea735de15de2eac3c108be94f57..c19517bb74efd90e816ac40541fe956d68e5554d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.state.OperatorState;
/**
* The SingleOutputStreamOperator represents a user defined transformation
* applied on a {@link DataStream} with one predefined output type.
- *
+ *
* @param
* Output type of the operator.
* @param
@@ -52,6 +52,13 @@ public class SingleOutputStreamOperator SingleOutputStreamOperator setType(TypeInformation outType) {
+ jobGraphBuilder.setOutType(id, outType);
+ typeInfo = outType;
+ return (SingleOutputStreamOperator) this;
+ }
+
/**
* Sets the degree of parallelism for this operator. The degree must be 1 or
* more.
@@ -71,7 +78,6 @@ public class SingleOutputStreamOperator extends
}
/**
- * Continues a temporal Join transformation and defines the
- * {@link Tuple} fields of the second join {@link DataStream} that
- * should be used as join keys.
- * Note: Fields can only be selected as join keys on Tuple
- * DataStreams.
+ * Creates a temporal Join transformation and defines the {@link Tuple}
+ * fields of the second join {@link DataStream} that should be used as
+ * join keys.
+ *
The resulting operator wraps each pair of joining elements in a
+ * Tuple2(first, second). To use a different wrapping function
+ * use {@link JoinedStream#with(JoinFunction)}
*
* @param fields
* The indexes of the Tuple fields of the second join
* DataStream that should be used as keys.
- * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
- * {@link FinalizeStreamJoin#withDefault} to complete
+ * @return A streaming join operator. Call {@link JoinedStream#with} to
+ * apply a custom wrapping
*/
- public FinalizeStreamJoin equalTo(int... fields) {
+ public JoinedStream equalTo(int... fields) {
keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields);
- return new FinalizeStreamJoin(this);
+ return createJoinOperator();
}
/**
- * Continues a temporal Join transformation and defines the fields of
- * the second join {@link DataStream} that should be used as join keys.
+ * Creates a temporal Join transformation and defines the fields of the
+ * second join {@link DataStream} that should be used as join keys.
+ * The resulting operator wraps each pair of joining elements in a
+ * Tuple2(first, second). To use a different wrapping function
+ * use {@link JoinedStream#with(JoinFunction)}
*
* @param fields
* The fields of the second join DataStream that should be
* used as keys.
- * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
- * {@link FinalizeStreamJoin#withDefault} to complete
+ * @return A streaming join operator. Call {@link JoinedStream#with} to
+ * apply a custom wrapping
*/
- public FinalizeStreamJoin equalTo(String... fields) {
+ public JoinedStream equalTo(String... fields) {
this.keys2 = new PojoKeySelector(op.input2.getType(), fields);
- return new FinalizeStreamJoin(this);
+ return createJoinOperator();
}
/**
- * Continues a temporal Join transformation and defines a
+ * Creates a temporal Join transformation and defines a
* {@link KeySelector} function for the second join {@link DataStream}
* . The KeySelector function is called for each element of the
* second DataStream and extracts a single key value on which the
- * DataStream is joined.
+ * DataStream is joined. The resulting operator wraps each pair of
+ * joining elements in a Tuple2(first, second). To use a
+ * different wrapping function use
+ * {@link JoinedStream#with(JoinFunction)}
+ *
*
* @param keySelector
* The KeySelector function which extracts the key values
* from the second DataStream on which it is joined.
- * @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
- * {@link FinalizeStreamJoin#withDefault} to complete
+ * @return A streaming join operator. Call {@link JoinedStream#with} to
+ * apply a custom wrapping
*/
- public FinalizeStreamJoin equalTo(KeySelector keySelector) {
+ public JoinedStream equalTo(KeySelector keySelector) {
this.keys2 = keySelector;
- return new FinalizeStreamJoin(this);
+ return createJoinOperator();
}
+ private JoinedStream createJoinOperator() {
+
+ JoinFunction> joinFunction = new DefaultJoinFunction();
+
+ JoinWindowFunction> joinWindowFunction = getJoinWindowFunction(
+ joinFunction, this);
+
+ TypeInformation> outType = new TupleTypeInfo>(
+ op.input1.getType(), op.input2.getType());
+
+ return new JoinedStream(this, op.input1
+ .groupBy(keys1)
+ .connect(op.input2.groupBy(keys2))
+ .addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
+ op.slideInterval, op.timeStamp1, op.timeStamp2));
+ }
}
- public static class FinalizeStreamJoin {
+ public static class JoinedStream extends
+ SingleOutputStreamOperator, JoinedStream> {
private final JoinPredicate predicate;
- private FinalizeStreamJoin(JoinPredicate predicate) {
+ private JoinedStream(JoinPredicate predicate, DataStream> ds) {
+ super(ds);
this.predicate = predicate;
}
- /**
- * Completes a stream join. The resulting operator wraps each pair
- * of joining elements into a {@link Tuple2}, with the element of the
- * first input being the first field of the tuple and the element of the
- * second input being the second field of the tuple.
- *
- * @return The joined data stream.
- */
- public SingleOutputStreamOperator, ?> withDefault() {
- return createJoinOperator(new DefaultJoinFunction());
- }
-
/**
* Completes a stream join. The resulting operator wraps each pair
* of joining elements using the user defined {@link JoinFunction}
@@ -200,36 +216,36 @@ public class StreamJoinOperator extends
* @return The joined data stream.
*/
public SingleOutputStreamOperator with(JoinFunction joinFunction) {
- return createJoinOperator(joinFunction);
- }
- private SingleOutputStreamOperator createJoinOperator(
- JoinFunction joinFunction) {
-
- JoinWindowFunction joinWindowFunction = new JoinWindowFunction(
- predicate.keys1, predicate.keys2, joinFunction);
+ TypeInformation outType = TypeExtractor.getJoinReturnTypes(joinFunction,
+ predicate.op.input1.getType(), predicate.op.input2.getType());
- StreamJoinOperator op = predicate.op;
+ CoWindowInvokable invokable = new CoWindowInvokable(
+ getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
+ predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
- TypeInformation outType = TypeExtractor.getJoinReturnTypes(joinFunction,
- op.input1.getType(), op.input2.getType());
+ jobGraphBuilder.setInvokable(id, invokable);
- return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction,
- outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
+ return setType(outType);
}
}
- public static final class DefaultJoinFunction implements
- JoinFunction> {
+ public static final class DefaultJoinFunction implements
+ JoinFunction> {
private static final long serialVersionUID = 1L;
- private final Tuple2 outTuple = new Tuple2();
+ private final Tuple2 outTuple = new Tuple2();
@Override
- public Tuple2 join(T1 first, T2 second) throws Exception {
+ public Tuple2 join(I1 first, I2 second) throws Exception {
outTuple.f0 = first;
outTuple.f1 = second;
return outTuple;
}
}
+
+ public static JoinWindowFunction getJoinWindowFunction(
+ JoinFunction joinFunction, JoinPredicate predicate) {
+ return new JoinWindowFunction(predicate.keys1, predicate.keys2, joinFunction);
+ }
}
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index e71b18c62b9de92ad9d3a2e256c21e65f5965f63..c8ad5331af728b5b9a05279746d679332ca85364 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -56,7 +56,7 @@ public class StreamProjection {
protected StreamProjection(DataStream dataStream, int[] fieldIndexes) {
this.dataStream = dataStream;
this.fieldIndexes = fieldIndexes;
- this.inTypeInfo = dataStream.typeInfo;
+ this.inTypeInfo = dataStream.getType();
if (!inTypeInfo.isTupleType()) {
throw new RuntimeException("Only Tuple DataStreams can be projected");
}
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
index d7851092bb59a7ca66b5ea6ebddfdc5fa6ffe2d0..171ddc9e6ace212428f120f1ebb3ad9363db4e1b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/keys/FieldsKeySelector.java
@@ -56,12 +56,13 @@ public abstract class FieldsKeySelector implements KeySelector {
protected Object key;
protected boolean simpleKey;
- public static Class>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class,
- Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
- Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class,
- Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class,
- Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class,
- Tuple25.class };
+ @SuppressWarnings("unchecked")
+ public static Class extends Tuple>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class,
+ Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
+ Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
+ Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
+ Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
+ Tuple24.class, Tuple25.class };
public FieldsKeySelector(int... fields) {
this.keyFields = fields;
@@ -73,7 +74,7 @@ public abstract class FieldsKeySelector implements KeySelector {
}
try {
- key = (Tuple) tupleClasses[fields.length - 1].newInstance();
+ key = tupleClasses[fields.length - 1].newInstance();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index 07d40ff8fe3127d812f46e484b170c7988c765e6..37f8c0a720ecadf98bc91e581feeaef30000019d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable {
DataStream inStream2 = env.fromCollection(in2);
inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
- .where(0).equalTo(0).withDefault().addSink(new JoinResultSink());
+ .where(0).equalTo(0).addSink(new JoinResultSink());
inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.with(new CrossFunction, Integer, Tuple2, Integer>>() {
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 2586e3c4fa884818d4d41e6f68f4c270cdec7b4c..897ad48bb959a51934ae79394eb201501c1ab756 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.examples.join;
import java.util.Random;
+import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +36,7 @@ import org.apache.flink.util.Collector;
* his example will join two streams with a sliding window. One which emits
* grades and one which emits salaries of people.
*
- *
+ *
*
* This example shows how to:
*
@@ -63,13 +65,13 @@ public class WindowJoin {
// apply a temporal join over the two stream based on the names over one
// second windows
- DataStream, Tuple2>> joinedStream = grades
- .join(salaries)
- .onWindow(1000)
- .where(0)
- .equalTo(0)
- .withDefault();
-
+ DataStream> joinedStream = grades
+ .join(salaries)
+ .onWindow(1000)
+ .where(0)
+ .equalTo(0)
+ .with(new MyJoinFunction());
+
// emit result
if (fileOutput) {
joinedStream.writeAsText(outputPath, 1);
@@ -141,6 +143,24 @@ public class WindowJoin {
}
}
+ public static class MyJoinFunction
+ implements
+ JoinFunction, Tuple2, Tuple3> {
+
+ private static final long serialVersionUID = 1L;
+
+ private Tuple3 joined = new Tuple3();
+
+ @Override
+ public Tuple3 join(Tuple2 first,
+ Tuple2 second) throws Exception {
+ joined.f0 = first.f0;
+ joined.f1 = first.f1;
+ joined.f2 = second.f1;
+ return joined;
+ }
+ }
+
// *************************************************************************
// UTIL METHODS
// *************************************************************************
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index a117412c1fff5926988dc100783dcd5b175bf65c..871fedead49b34ff8685ab51b4399d17ce3ed574 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -108,7 +108,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def groupBy(fields: Int*): DataStream[T] =
- new DataStream[T](javaStream.groupBy(fields: _*))
+ new DataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
/**
* Groups the elements of a DataStream by the given field expressions to
@@ -138,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def partitionBy(fields: Int*): DataStream[T] =
- new DataStream[T](javaStream.partitionBy(fields: _*))
+ new DataStream[T](javaStream.partitionBy(new FieldsKeySelector[T](fields: _*)))
/**
* Sets the partitioning of the DataStream so that the output is
@@ -458,6 +458,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
split(selector)
}
+ def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
+
/**
* Writes a DataStream to the standard output stream (stdout). For each
* element of the DataStream the result of .toString is
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
new file mode 100644
index 0000000000000000000000000000000000000000..422351245c0780505bc7dcf9b402e6dd922df835
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/FieldsKeySelector.scala
@@ -0,0 +1,29 @@
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.streaming.util.keys.{ FieldsKeySelector => JavaSelector }
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.tuple.Tuple
+
+class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple] {
+
+ val t: Tuple = JavaSelector.tupleClasses(fields.length - 1).newInstance()
+
+ override def getKey(value: IN): Tuple =
+
+ value match {
+ case prod: Product => {
+ for (i <- 0 to fields.length - 1) {
+ t.setField(prod.productElement(fields(i)), i)
+ }
+ t
+ }
+ case tuple: Tuple => {
+ for (i <- 0 to fields.length - 1) {
+ t.setField(tuple.getField(fields(i)), i)
+ }
+ t
+ }
+ case _ => throw new RuntimeException("Only tuple types are supported")
+ }
+
+}
\ No newline at end of file
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
new file mode 100644
index 0000000000000000000000000000000000000000..93950a278ca454fca698820f5284ec0c41d2503c
--- /dev/null
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.streaming
+
+import org.apache.flink.api.common.functions.JoinFunction
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.scala.ClosureCleaner
+import org.apache.flink.api.scala.typeutils.CaseClassSerializer
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
+import org.apache.flink.streaming.api.datastream.TemporalOperator
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction
+import org.apache.flink.streaming.util.keys.PojoKeySelector
+import scala.reflect.ClassTag
+import org.apache.commons.lang.Validate
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
+
+class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
+
+ override def createNextWindowOperator() = {
+ new StreamJoinOperator.JoinWindow[I1, I2](this)
+ }
+}
+
+object StreamJoinOperator {
+
+ private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+ ClosureCleaner.clean(f, checkSerializable)
+ f
+ }
+
+ class JoinWindow[I1, I2](op: StreamJoinOperator[I1, I2]) {
+
+ /**
+ * Continues a temporal Join transformation by defining
+ * the fields in the first stream to be used as keys for the join.
+ * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+ * to define the second key.
+ */
+ def where(fields: Int*) = {
+ new JoinPredicate[I1, I2](op, new FieldsKeySelector[I1](fields: _*))
+ }
+
+ /**
+ * Continues a temporal Join transformation by defining
+ * the fields in the first stream to be used as keys for the join.
+ * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+ * to define the second key.
+ */
+ def where(firstField: String, otherFields: String*) = {
+ new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField +: otherFields): _*))
+ }
+
+ /**
+ * Continues a temporal Join transformation by defining
+ * the keyselector function that will be used to extract keys from the first stream
+ * for the join.
+ * The resulting incomplete join can be completed by JoinPredicate.equalTo()
+ * to define the second key.
+ */
+ def where[K: TypeInformation](fun: (I1) => K) = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[I1, K] {
+ val cleanFun = op.input1.clean(fun)
+ def getKey(in: I1) = cleanFun(in)
+ }
+ new JoinPredicate[I1, I2](op, keyExtractor)
+ }
+
+ }
+
+ class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink] val keys1: KeySelector[I1, _]) {
+ private[flink] var keys2: KeySelector[I2, _] = null
+
+ /**
+ * Creates a temporal join transformation by defining the second join key.
+ * The returned transformation wrapes each joined element pair in a tuple2:
+ * (first, second)
+ * To define a custom wrapping, use JoinedStream.with(...)
+ */
+ def equalTo(fields: Int*): JoinedStream[I1, I2] = {
+ finish(new FieldsKeySelector[I2](fields: _*))
+ }
+
+ /**
+ * Creates a temporal join transformation by defining the second join key.
+ * The returned transformation wrapes each joined element pair in a tuple2:
+ * (first, second)
+ * To define a custom wrapping, use JoinedStream.with(...)
+ */
+ def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = {
+ finish(new PojoKeySelector[I2](op.input2.getType(), (firstField +: otherFields): _*))
+ }
+
+ /**
+ * Creates a temporal join transformation by defining the second join key.
+ * The returned transformation wrapes each joined element pair in a tuple2:
+ * (first, second)
+ * To define a custom wrapping, use JoinedStream.with(...)
+ */
+ def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
+ val keyType = implicitly[TypeInformation[K]]
+ val keyExtractor = new KeySelector[I2, K] {
+ val cleanFun = op.input1.clean(fun)
+ def getKey(in: I2) = cleanFun(in)
+ }
+ finish(keyExtractor)
+ }
+
+ private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
+ this.keys2 = keys2
+ new JoinedStream[I1, I2](this, createJoinOperator())
+ }
+
+ private def createJoinOperator(): JavaStream[(I1, I2)] = {
+
+ val returnType = new CaseClassTypeInfo[(I1, I2)](
+
+ classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) {
+
+ override def createSerializer: TypeSerializer[(I1, I2)] = {
+ val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
+ for (i <- 0 until getArity) {
+ fieldSerializers(i) = types(i).createSerializer
+ }
+
+ new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
+ override def createInstance(fields: Array[AnyRef]) = {
+ (fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
+ }
+ }
+ }
+ }
+
+ return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
+ returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+ }
+ }
+
+ class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
+
+ private val op = jp.op
+
+ def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
+
+ val invokable = new CoWindowInvokable[I1, I2, R](
+ clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
+
+ javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable)
+
+ new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
+ }
+ }
+
+ private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction: (I1, I2) => R) = {
+ Validate.notNull(joinFunction, "Join function must not be null.")
+
+ val joinFun = new JoinFunction[I1, I2, R] {
+
+ val cleanFun = clean(joinFunction)
+
+ override def join(first: I1, second: I2): R = {
+ cleanFun(first, second)
+ }
+ }
+
+ new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
+ }
+
+}
\ No newline at end of file
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index ff89a47884b35a90370895bd6360151a5d7403b3..c6864971973359df39a8bb9bfab1f20cf78a9221 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -61,7 +61,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
*
*/
def groupBy(fields: Int*): WindowedDataStream[T] =
- new WindowedDataStream[T](javaStream.groupBy(fields: _*))
+ new WindowedDataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
/**
* Groups the elements of the WindowedDataStream using the given