提交 555837cd 编写于 作者: G Gyula Fora

[scala] [streaming] Temporal join operator added

上级 1c87d8bc
......@@ -505,6 +505,15 @@ public class JobGraphBuilder {
}
public <IN, OUT> void setInvokable(String id, StreamInvokable<IN, OUT> invokableObject) {
invokableObjects.put(id, invokableObject);
}
public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType);
typeSerializersOut1.put(id, serializer);
}
/**
* Sets TypeSerializerWrapper from one vertex to another, used with some
* sinks.
......
......@@ -539,7 +539,7 @@ public class ConnectedDataStream<IN1, IN2> {
return invokable;
}
protected <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
TimeStamp<IN2> timestamp2) {
......@@ -550,7 +550,7 @@ public class ConnectedDataStream<IN1, IN2> {
if (slideInterval < 1) {
throw new IllegalArgumentException("Slide interval must be positive");
}
return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
......
......@@ -92,7 +92,8 @@ public class DataStream<OUT> {
protected List<String> userDefinedNames;
protected boolean selectAll;
protected StreamPartitioner<OUT> partitioner;
protected final TypeInformation<OUT> typeInfo;
@SuppressWarnings("rawtypes")
protected TypeInformation typeInfo;
protected List<DataStream<OUT>> mergedStreams;
protected final JobGraphBuilder jobGraphBuilder;
......@@ -175,10 +176,18 @@ public class DataStream<OUT> {
*
* @return The type of the datastream.
*/
@SuppressWarnings("unchecked")
public TypeInformation<OUT> getType() {
return this.typeInfo;
}
@SuppressWarnings("unchecked")
public <R> DataStream<R> setType(TypeInformation<R> outType) {
jobGraphBuilder.setOutType(id, outType);
typeInfo = outType;
return (DataStream<R>) this;
}
public <F> F clean(F f) {
if (getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
ClosureCleaner.clean(f, true);
......@@ -979,7 +988,7 @@ public class DataStream<OUT> {
StreamReduceInvokable<OUT> invokable = new StreamReduceInvokable<OUT>(aggregate);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", typeInfo, invokable);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("reduce", getType(), invokable);
return returnStream;
}
......@@ -1077,7 +1086,7 @@ public class DataStream<OUT> {
*/
public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", typeInfo);
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType());
jobGraphBuilder.addStreamVertex(returnStream.getId(), new SinkInvokable<OUT>(
clean(sinkFunction)), getType(), null, "sink", degreeOfParallelism);
......
......@@ -186,7 +186,7 @@ public class GroupedDataStream<OUT> extends DataStream<OUT> {
GroupedReduceInvokable<OUT> invokable = new GroupedReduceInvokable<OUT>(clean(aggregate),
keySelector);
SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", typeInfo,
SingleOutputStreamOperator<OUT, ?> returnStream = transform("groupReduce", getType(),
invokable);
return returnStream;
......
......@@ -33,7 +33,7 @@ import org.apache.flink.streaming.state.OperatorState;
/**
* The SingleOutputStreamOperator represents a user defined transformation
* applied on a {@link DataStream} with one predefined output type.
*
*
* @param <OUT>
* Output type of the operator.
* @param <O>
......@@ -52,6 +52,13 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
super(dataStream);
}
@SuppressWarnings("unchecked")
public <R> SingleOutputStreamOperator<R, ?> setType(TypeInformation<R> outType) {
jobGraphBuilder.setOutType(id, outType);
typeInfo = outType;
return (SingleOutputStreamOperator<R, ?>) this;
}
/**
* Sets the degree of parallelism for this operator. The degree must be 1 or
* more.
......@@ -71,7 +78,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
return this;
}
/**
* Sets the maximum time frequency (ms) for the flushing of the output
* buffer. By default the output buffers flush only when they are full.
......
......@@ -23,8 +23,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
import org.apache.flink.streaming.util.keys.FieldsKeySelector;
import org.apache.flink.streaming.util.keys.PojoKeySelector;
......@@ -122,77 +124,91 @@ public class StreamJoinOperator<I1, I2> extends
}
/**
* Continues a temporal Join transformation and defines the
* {@link Tuple} fields of the second join {@link DataStream} that
* should be used as join keys.<br/>
* <b>Note: Fields can only be selected as join keys on Tuple
* DataStreams.</b><br/>
* Creates a temporal Join transformation and defines the {@link Tuple}
* fields of the second join {@link DataStream} that should be used as
* join keys.<br/>
* </p> The resulting operator wraps each pair of joining elements in a
* Tuple2<I1,I2>(first, second). To use a different wrapping function
* use {@link JoinedStream#with(JoinFunction)}
*
* @param fields
* The indexes of the Tuple fields of the second join
* DataStream that should be used as keys.
* @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
* {@link FinalizeStreamJoin#withDefault} to complete
* @return A streaming join operator. Call {@link JoinedStream#with} to
* apply a custom wrapping
*/
public FinalizeStreamJoin<I1, I2> equalTo(int... fields) {
public JoinedStream<I1, I2> equalTo(int... fields) {
keys2 = FieldsKeySelector.getSelector(op.input2.getType(), fields);
return new FinalizeStreamJoin<I1, I2>(this);
return createJoinOperator();
}
/**
* Continues a temporal Join transformation and defines the fields of
* the second join {@link DataStream} that should be used as join keys.<br/>
* Creates a temporal Join transformation and defines the fields of the
* second join {@link DataStream} that should be used as join keys. </p>
* The resulting operator wraps each pair of joining elements in a
* Tuple2<I1,I2>(first, second). To use a different wrapping function
* use {@link JoinedStream#with(JoinFunction)}
*
* @param fields
* The fields of the second join DataStream that should be
* used as keys.
* @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
* {@link FinalizeStreamJoin#withDefault} to complete
* @return A streaming join operator. Call {@link JoinedStream#with} to
* apply a custom wrapping
*/
public FinalizeStreamJoin<I1, I2> equalTo(String... fields) {
public JoinedStream<I1, I2> equalTo(String... fields) {
this.keys2 = new PojoKeySelector<I2>(op.input2.getType(), fields);
return new FinalizeStreamJoin<I1, I2>(this);
return createJoinOperator();
}
/**
* Continues a temporal Join transformation and defines a
* Creates a temporal Join transformation and defines a
* {@link KeySelector} function for the second join {@link DataStream}
* .</br> The KeySelector function is called for each element of the
* second DataStream and extracts a single key value on which the
* DataStream is joined. </br>
* DataStream is joined. </p> The resulting operator wraps each pair of
* joining elements in a Tuple2<I1,I2>(first, second). To use a
* different wrapping function use
* {@link JoinedStream#with(JoinFunction)}
*
*
* @param keySelector
* The KeySelector function which extracts the key values
* from the second DataStream on which it is joined.
* @return An incomplete join. Call {@link FinalizeStreamJoin#with} or
* {@link FinalizeStreamJoin#withDefault} to complete
* @return A streaming join operator. Call {@link JoinedStream#with} to
* apply a custom wrapping
*/
public <K> FinalizeStreamJoin<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
this.keys2 = keySelector;
return new FinalizeStreamJoin<I1, I2>(this);
return createJoinOperator();
}
private JoinedStream<I1, I2> createJoinOperator() {
JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
joinFunction, this);
TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
op.input1.getType(), op.input2.getType());
return new JoinedStream<I1, I2>(this, op.input1
.groupBy(keys1)
.connect(op.input2.groupBy(keys2))
.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
op.slideInterval, op.timeStamp1, op.timeStamp2));
}
}
public static class FinalizeStreamJoin<I1, I2> {
public static class JoinedStream<I1, I2> extends
SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> {
private final JoinPredicate<I1, I2> predicate;
private FinalizeStreamJoin(JoinPredicate<I1, I2> predicate) {
private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) {
super(ds);
this.predicate = predicate;
}
/**
* Completes a stream join. </p> The resulting operator wraps each pair
* of joining elements into a {@link Tuple2}, with the element of the
* first input being the first field of the tuple and the element of the
* second input being the second field of the tuple.
*
* @return The joined data stream.
*/
public SingleOutputStreamOperator<Tuple2<I1, I2>, ?> withDefault() {
return createJoinOperator(new DefaultJoinFunction<I1, I2>());
}
/**
* Completes a stream join. </p> The resulting operator wraps each pair
* of joining elements using the user defined {@link JoinFunction}
......@@ -200,36 +216,36 @@ public class StreamJoinOperator<I1, I2> extends
* @return The joined data stream.
*/
public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
return createJoinOperator(joinFunction);
}
private <OUT> SingleOutputStreamOperator<OUT, ?> createJoinOperator(
JoinFunction<I1, I2, OUT> joinFunction) {
JoinWindowFunction<I1, I2, OUT> joinWindowFunction = new JoinWindowFunction<I1, I2, OUT>(
predicate.keys1, predicate.keys2, joinFunction);
TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
predicate.op.input1.getType(), predicate.op.input2.getType());
StreamJoinOperator<I1, I2> op = predicate.op;
CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>(
getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
op.input1.getType(), op.input2.getType());
jobGraphBuilder.setInvokable(id, invokable);
return op.input1.connect(op.input2).addGeneralWindowCombine(joinWindowFunction,
outType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2);
return setType(outType);
}
}
public static final class DefaultJoinFunction<T1, T2> implements
JoinFunction<T1, T2, Tuple2<T1, T2>> {
public static final class DefaultJoinFunction<I1, I2> implements
JoinFunction<I1, I2, Tuple2<I1, I2>> {
private static final long serialVersionUID = 1L;
private final Tuple2<T1, T2> outTuple = new Tuple2<T1, T2>();
private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
@Override
public Tuple2<T1, T2> join(T1 first, T2 second) throws Exception {
public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
outTuple.f0 = first;
outTuple.f1 = second;
return outTuple;
}
}
public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
}
}
......@@ -56,7 +56,7 @@ public class StreamProjection<IN> {
protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
this.dataStream = dataStream;
this.fieldIndexes = fieldIndexes;
this.inTypeInfo = dataStream.typeInfo;
this.inTypeInfo = dataStream.getType();
if (!inTypeInfo.isTupleType()) {
throw new RuntimeException("Only Tuple DataStreams can be projected");
}
......
......@@ -56,12 +56,13 @@ public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
protected Object key;
protected boolean simpleKey;
public static Class<?>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class, Tuple3.class,
Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class,
Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class,
Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class,
Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class,
Tuple25.class };
@SuppressWarnings("unchecked")
public static Class<? extends Tuple>[] tupleClasses = new Class[] { Tuple1.class, Tuple2.class,
Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class,
Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class,
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class,
Tuple24.class, Tuple25.class };
public FieldsKeySelector(int... fields) {
this.keyFields = fields;
......@@ -73,7 +74,7 @@ public abstract class FieldsKeySelector<IN> implements KeySelector<IN, Object> {
}
try {
key = (Tuple) tupleClasses[fields.length - 1].newInstance();
key = tupleClasses[fields.length - 1].newInstance();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
......
......@@ -96,7 +96,7 @@ public class WindowCrossJoinTest implements Serializable {
DataStream<Integer> inStream2 = env.fromCollection(in2);
inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.where(0).equalTo(0).withDefault().addSink(new JoinResultSink());
.where(0).equalTo(0).addSink(new JoinResultSink());
inStream1.cross(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
.with(new CrossFunction<Tuple2<Integer,String>, Integer, Tuple2<Tuple2<Integer,String>, Integer>>() {
......
......@@ -19,7 +19,9 @@ package org.apache.flink.streaming.examples.join;
import java.util.Random;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -34,7 +36,7 @@ import org.apache.flink.util.Collector;
* his example will join two streams with a sliding window. One which emits
* grades and one which emits salaries of people.
* </p>
*
*
* <p>
* This example shows how to:
* <ul>
......@@ -63,13 +65,13 @@ public class WindowJoin {
// apply a temporal join over the two stream based on the names over one
// second windows
DataStream<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> joinedStream = grades
.join(salaries)
.onWindow(1000)
.where(0)
.equalTo(0)
.withDefault();
DataStream<Tuple3<String, Integer, Integer>> joinedStream = grades
.join(salaries)
.onWindow(1000)
.where(0)
.equalTo(0)
.with(new MyJoinFunction());
// emit result
if (fileOutput) {
joinedStream.writeAsText(outputPath, 1);
......@@ -141,6 +143,24 @@ public class WindowJoin {
}
}
public static class MyJoinFunction
implements
JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;
private Tuple3<String, Integer, Integer> joined = new Tuple3<String, Integer, Integer>();
@Override
public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first,
Tuple2<String, Integer> second) throws Exception {
joined.f0 = first.f0;
joined.f1 = first.f1;
joined.f2 = second.f1;
return joined;
}
}
// *************************************************************************
// UTIL METHODS
// *************************************************************************
......
......@@ -108,7 +108,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def groupBy(fields: Int*): DataStream[T] =
new DataStream[T](javaStream.groupBy(fields: _*))
new DataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
/**
* Groups the elements of a DataStream by the given field expressions to
......@@ -138,7 +138,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def partitionBy(fields: Int*): DataStream[T] =
new DataStream[T](javaStream.partitionBy(fields: _*))
new DataStream[T](javaStream.partitionBy(new FieldsKeySelector[T](fields: _*)))
/**
* Sets the partitioning of the DataStream so that the output is
......@@ -458,6 +458,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
split(selector)
}
def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
/**
* Writes a DataStream to the standard output stream (stdout). For each
* element of the DataStream the result of .toString is
......
package org.apache.flink.api.scala.streaming
import org.apache.flink.streaming.util.keys.{ FieldsKeySelector => JavaSelector }
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.Tuple
class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple] {
val t: Tuple = JavaSelector.tupleClasses(fields.length - 1).newInstance()
override def getKey(value: IN): Tuple =
value match {
case prod: Product => {
for (i <- 0 to fields.length - 1) {
t.setField(prod.productElement(fields(i)), i)
}
t
}
case tuple: Tuple => {
for (i <- 0 to fields.length - 1) {
t.setField(tuple.getField(fields(i)), i)
}
t
}
case _ => throw new RuntimeException("Only tuple types are supported")
}
}
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.streaming
import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.api.scala.typeutils.CaseClassSerializer
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.TemporalOperator
import org.apache.flink.streaming.api.function.co.JoinWindowFunction
import org.apache.flink.streaming.util.keys.PojoKeySelector
import scala.reflect.ClassTag
import org.apache.commons.lang.Validate
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
override def createNextWindowOperator() = {
new StreamJoinOperator.JoinWindow[I1, I2](this)
}
}
object StreamJoinOperator {
private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
f
}
class JoinWindow[I1, I2](op: StreamJoinOperator[I1, I2]) {
/**
* Continues a temporal Join transformation by defining
* the fields in the first stream to be used as keys for the join.
* The resulting incomplete join can be completed by JoinPredicate.equalTo()
* to define the second key.
*/
def where(fields: Int*) = {
new JoinPredicate[I1, I2](op, new FieldsKeySelector[I1](fields: _*))
}
/**
* Continues a temporal Join transformation by defining
* the fields in the first stream to be used as keys for the join.
* The resulting incomplete join can be completed by JoinPredicate.equalTo()
* to define the second key.
*/
def where(firstField: String, otherFields: String*) = {
new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField +: otherFields): _*))
}
/**
* Continues a temporal Join transformation by defining
* the keyselector function that will be used to extract keys from the first stream
* for the join.
* The resulting incomplete join can be completed by JoinPredicate.equalTo()
* to define the second key.
*/
def where[K: TypeInformation](fun: (I1) => K) = {
val keyType = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[I1, K] {
val cleanFun = op.input1.clean(fun)
def getKey(in: I1) = cleanFun(in)
}
new JoinPredicate[I1, I2](op, keyExtractor)
}
}
class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink] val keys1: KeySelector[I1, _]) {
private[flink] var keys2: KeySelector[I2, _] = null
/**
* Creates a temporal join transformation by defining the second join key.
* The returned transformation wrapes each joined element pair in a tuple2:
* (first, second)
* To define a custom wrapping, use JoinedStream.with(...)
*/
def equalTo(fields: Int*): JoinedStream[I1, I2] = {
finish(new FieldsKeySelector[I2](fields: _*))
}
/**
* Creates a temporal join transformation by defining the second join key.
* The returned transformation wrapes each joined element pair in a tuple2:
* (first, second)
* To define a custom wrapping, use JoinedStream.with(...)
*/
def equalTo(firstField: String, otherFields: String*): JoinedStream[I1, I2] = {
finish(new PojoKeySelector[I2](op.input2.getType(), (firstField +: otherFields): _*))
}
/**
* Creates a temporal join transformation by defining the second join key.
* The returned transformation wrapes each joined element pair in a tuple2:
* (first, second)
* To define a custom wrapping, use JoinedStream.with(...)
*/
def equalTo[K: TypeInformation](fun: (I2) => K): JoinedStream[I1, I2] = {
val keyType = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[I2, K] {
val cleanFun = op.input1.clean(fun)
def getKey(in: I2) = cleanFun(in)
}
finish(keyExtractor)
}
private def finish(keys2: KeySelector[I2, _]): JoinedStream[I1, I2] = {
this.keys2 = keys2
new JoinedStream[I1, I2](this, createJoinOperator())
}
private def createJoinOperator(): JavaStream[(I1, I2)] = {
val returnType = new CaseClassTypeInfo[(I1, I2)](
classOf[(I1, I2)], Seq(op.input1.getType, op.input2.getType), Array("_1", "_2")) {
override def createSerializer: TypeSerializer[(I1, I2)] = {
val fieldSerializers: Array[TypeSerializer[_]] = new Array[TypeSerializer[_]](getArity)
for (i <- 0 until getArity) {
fieldSerializers(i) = types(i).createSerializer
}
new CaseClassSerializer[(I1, I2)](classOf[(I1, I2)], fieldSerializers) {
override def createInstance(fields: Array[AnyRef]) = {
(fields(0).asInstanceOf[I1], fields(1).asInstanceOf[I2])
}
}
}
}
return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
}
}
class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
private val op = jp.op
def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
val invokable = new CoWindowInvokable[I1, I2, R](
clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable)
new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
}
}
private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction: (I1, I2) => R) = {
Validate.notNull(joinFunction, "Join function must not be null.")
val joinFun = new JoinFunction[I1, I2, R] {
val cleanFun = clean(joinFunction)
override def join(first: I1, second: I2): R = {
cleanFun(first, second)
}
}
new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
}
}
\ No newline at end of file
......@@ -61,7 +61,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
*
*/
def groupBy(fields: Int*): WindowedDataStream[T] =
new WindowedDataStream[T](javaStream.groupBy(fields: _*))
new WindowedDataStream[T](javaStream.groupBy(new FieldsKeySelector[T](fields: _*)))
/**
* Groups the elements of the WindowedDataStream using the given
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册