ConnectedDataStream.java 13.1 KB
Newer Older
1
/*
2 3 4 5 6 7 8 9 10 11 12 13 14
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16 17 18 19 20 21 22 23
 */

package org.apache.flink.streaming.api.datastream;

import java.io.Serializable;

import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
24
import org.apache.flink.api.common.functions.CoGroupFunction;
25
import org.apache.flink.api.common.functions.Function;
26
import org.apache.flink.api.common.functions.RichFlatMapFunction;
27
import org.apache.flink.api.common.typeinfo.TypeInformation;
28 29 30 31
import org.apache.flink.streaming.api.JobGraphBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
32
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
33
import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
34
import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
35
import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
36 37
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
38 39
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
40
import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
41 42
import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
import org.apache.flink.streaming.api.invokable.util.TimeStamp;
43
import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
44
import org.apache.flink.streaming.util.serialization.TypeWrapper;
45 46 47 48 49 50 51 52 53 54 55 56 57

/**
 * The ConnectedDataStream represents a stream for two different data types. It
 * can be used to apply transformations like {@link CoMapFunction} on two
 * {@link DataStream}s
 *
 * @param <IN1>
 *            Type of the first DataSteam.
 * @param <IN2>
 *            Type of the second DataStream.
 */
public class ConnectedDataStream<IN1, IN2> {

58 59 60 61
	protected StreamExecutionEnvironment environment;
	protected JobGraphBuilder jobGraphBuilder;
	protected DataStream<IN1> input1;
	protected DataStream<IN2> input2;
62

63 64 65 66 67 68 69
	protected boolean isGrouped;
	protected int keyPosition1;
	protected int keyPosition2;

	protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> input2) {
		this.jobGraphBuilder = input1.jobGraphBuilder;
		this.environment = input1.environment;
70
		this.input1 = input1.copy();
71
		this.input2 = input2.copy();
72 73 74 75 76 77 78 79

		if ((input1 instanceof GroupedDataStream) && (input2 instanceof GroupedDataStream)) {
			this.isGrouped = true;
			this.keyPosition1 = ((GroupedDataStream<IN1>) input1).keyPosition;
			this.keyPosition2 = ((GroupedDataStream<IN2>) input2).keyPosition;
		} else {
			this.isGrouped = false;
		}
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
	}

	/**
	 * Returns the first {@link DataStream}.
	 * 
	 * @return The first DataStream.
	 */
	public DataStream<IN1> getFirst() {
		return input1.copy();
	}

	/**
	 * Returns the second {@link DataStream}.
	 * 
	 * @return The second DataStream.
	 */
	public DataStream<IN2> getSecond() {
		return input2.copy();
	}

100 101
	/**
	 * Gets the type of the first input
102
	 * 
103 104 105 106 107
	 * @return The type of the first input
	 */
	public TypeInformation<IN1> getInputType1() {
		return input1.getOutputType();
	}
108

109 110
	/**
	 * Gets the type of the second input
111
	 * 
112 113 114 115 116
	 * @return The type of the second input
	 */
	public TypeInformation<IN2> getInputType2() {
		return input2.getOutputType();
	}
117

118 119 120 121
	/**
	 * GroupBy operation for connected data stream. Groups the elements of
	 * input1 and input2 according to keyPosition1 and keyPosition2. Used for
	 * applying function on grouped data streams for example
122
	 * {@link ConnectedDataStream#reduce}
123 124 125 126 127 128 129 130 131
	 * 
	 * @param keyPosition1
	 *            The field used to compute the hashcode of the elements in the
	 *            first input stream.
	 * @param keyPosition2
	 *            The field used to compute the hashcode of the elements in the
	 *            second input stream.
	 * @return Returns the {@link GroupedConnectedDataStream} created.
	 */
132
	public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int keyPosition2) {
133 134 135 136
		if (keyPosition1 < 0 || keyPosition2 < 0) {
			throw new IllegalArgumentException("The position of the field must be non-negative");
		}

137 138
		return new ConnectedDataStream<IN1, IN2>(input1.groupBy(keyPosition1),
				input2.groupBy(keyPosition2));
139 140 141
	}

	/**
142 143 144 145 146 147 148
	 * Applies a CoMap transformation on a {@link ConnectedDataStream} and maps
	 * the output to a common type. The transformation calls a
	 * {@link CoMapFunction#map1} for each element of the first input and
	 * {@link CoMapFunction#map2} for each element of the second input. Each
	 * CoMapFunction call returns exactly one element. The user can also extend
	 * {@link RichCoMapFunction} to gain access to other features provided by
	 * the {@link RichFuntion} interface.
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
	 * 
	 * @param coMapper
	 *            The CoMapFunction used to jointly transform the two input
	 *            DataStreams
	 * @return The transformed DataStream
	 */
	public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, IN2, OUT> coMapper) {
		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coMapper,
				CoMapFunction.class, 0);
		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coMapper,
				CoMapFunction.class, 1);
		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coMapper,
				CoMapFunction.class, 2);

		return addCoFunction("coMap", coMapper, in1TypeWrapper, in2TypeWrapper, outTypeWrapper,
				new CoMapInvokable<IN1, IN2, OUT>(coMapper));
	}

	/**
168 169
	 * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} and
	 * maps the output to a common type. The transformation calls a
170 171
	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input and
	 * {@link CoFlatMapFunction#flatMap2} for each element of the second input. Each
172 173 174
	 * CoFlatMapFunction call returns any number of elements including none. The
	 * user can also extend {@link RichFlatMapFunction} to gain access to other
	 * features provided by the {@link RichFuntion} interface.
175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
	 * 
	 * @param coFlatMapper
	 *            The CoFlatMapFunction used to jointly transform the two input
	 *            DataStreams
	 * @return The transformed DataStream
	 */
	public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
			CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coFlatMapper,
				CoFlatMapFunction.class, 0);
		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coFlatMapper,
				CoFlatMapFunction.class, 1);
		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coFlatMapper,
				CoFlatMapFunction.class, 2);

		return addCoFunction("coFlatMap", coFlatMapper, in1TypeWrapper, in2TypeWrapper,
				outTypeWrapper, new CoFlatMapInvokable<IN1, IN2, OUT>(coFlatMapper));
	}

194
	/**
195 196 197 198 199 200 201 202 203
	 * Applies a reduce transformation on a {@link ConnectedDataStream} and maps
	 * the outputs to a common type. The transformation calls
	 * {@link CoReduceFunction#reduce1} and {@link CoReduceFunction#map1} for
	 * each element of the first input and {@link CoReduceFunction#reduce2} and
	 * {@link CoReduceFunction#map2} for each element of the second input. This
	 * type of reduce is much faster than reduceGroup since the reduce function
	 * can be applied incrementally. The user can also extend the
	 * {@link RichCoReduceFunction} to gain access to other features provided by
	 * the {@link RichFuntion} interface.
204 205 206 207 208 209 210 211 212 213 214 215 216 217
	 * 
	 * @param coReducer
	 *            The {@link CoReduceFunction} that will be called for every
	 *            element of the inputs.
	 * @return The transformed DataStream.
	 */
	public <OUT> SingleOutputStreamOperator<OUT, ?> reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {

		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coReducer,
				CoReduceFunction.class, 0);
		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coReducer,
				CoReduceFunction.class, 1);
		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coReducer,
				CoReduceFunction.class, 2);
218 219 220 221 222 223 224
		if (this.isGrouped) {
			return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
					outTypeWrapper, new CoGroupedReduceInvokable<IN1, IN2, OUT>(coReducer,
							keyPosition1, keyPosition2));
		} else {
			return addCoFunction("coReduce", coReducer, in1TypeWrapper, in2TypeWrapper,
					outTypeWrapper, new CoReduceInvokable<IN1, IN2, OUT>(coReducer));
225 226 227 228
		}
	}

	/**
229 230 231 232
	 * Applies a CoGroup transformation on the connected DataStreams. The
	 * transformation calls the {@link CoGroupFunction#coGroupache} method for
	 * for time aligned windows of the two data streams. System time is used as
	 * default to compute windows.
233
	 * 
234 235 236 237 238 239 240 241 242
	 * @param coGroupFunction
	 *            The {@link CoGroupFunction} that will be applied for the time
	 *            windows.
	 * @param windowSize
	 *            Size of the windows that will be aligned for both streams in
	 *            milliseconds.
	 * @param slideInterval
	 *            After every function call the windows will be slid by this
	 *            interval.
243 244 245 246
	 * 
	 * @return The transformed {@link DataStream}.
	 */
	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
247 248 249
			CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize, long slideInterval) {
		return windowReduceGroup(coGroupFunction, windowSize, slideInterval,
				new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
250 251 252
	}

	/**
253 254 255 256
	 * Applies a CoGroup transformation on the connected DataStreams. The
	 * transformation calls the {@link CoGroupFunction#coGroupache} method for
	 * for time aligned windows of the two data streams. The user can implement
	 * their own time stamps or use the system time by default.
257
	 * 
258 259 260 261 262 263 264 265 266 267
	 * @param coGroupFunction
	 *            The {@link CoGroupFunction} that will be applied for the time
	 *            windows.
	 * @param windowSize
	 *            Size of the windows that will be aligned for both streams. If
	 *            system time is used it is milliseconds. User defined time
	 *            stamps are assumed to be monotonically increasing.
	 * @param slideInterval
	 *            After every function call the windows will be slid by this
	 *            interval.
268 269
	 * 
	 * @param timestamp1
270
	 *            User defined time stamps for the first input.
271
	 * @param timestamp2
272
	 *            User defined time stamps for the second input.
273 274 275
	 * @return The transformed {@link DataStream}.
	 */
	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduceGroup(
276 277
			CoGroupFunction<IN1, IN2, OUT> coGroupFunction, long windowSize, long slideInterval,
			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
278

279
		if (windowSize < 1) {
280 281
			throw new IllegalArgumentException("Window size must be positive");
		}
282
		if (slideInterval < 1) {
283 284 285
			throw new IllegalArgumentException("Slide interval must be positive");
		}

286 287 288 289 290 291
		FunctionTypeWrapper<IN1> in1TypeWrapper = new FunctionTypeWrapper<IN1>(coGroupFunction,
				CoGroupFunction.class, 0);
		FunctionTypeWrapper<IN2> in2TypeWrapper = new FunctionTypeWrapper<IN2>(coGroupFunction,
				CoGroupFunction.class, 1);
		FunctionTypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(coGroupFunction,
				CoGroupFunction.class, 2);
292

293 294 295
		return addCoFunction("coWindowReduce", coGroupFunction, in1TypeWrapper, in2TypeWrapper,
				outTypeWrapper, new CoGroupInvokable<IN1, IN2, OUT>(coGroupFunction, windowSize,
						slideInterval, timestamp1, timestamp2));
296 297
	}

298
	protected <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String functionName,
299 300
			final Function function, TypeWrapper<IN1> in1TypeWrapper,
			TypeWrapper<IN2> in2TypeWrapper, TypeWrapper<OUT> outTypeWrapper,
301 302 303 304
			CoInvokable<IN1, IN2, OUT> functionInvokable) {

		@SuppressWarnings({ "unchecked", "rawtypes" })
		SingleOutputStreamOperator<OUT, ?> returnStream = new SingleOutputStreamOperator(
305
				environment, functionName, outTypeWrapper);
306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324

		try {
			input1.jobGraphBuilder.addCoTask(returnStream.getId(), functionInvokable,
					in1TypeWrapper, in2TypeWrapper, outTypeWrapper, functionName,
					SerializationUtils.serialize((Serializable) function),
					environment.getDegreeOfParallelism());
		} catch (SerializationException e) {
			throw new RuntimeException("Cannot serialize user defined function");
		}

		input1.connectGraph(input1, returnStream.getId(), 1);
		input1.connectGraph(input2, returnStream.getId(), 2);

		// TODO consider iteration

		return returnStream;
	}

}