StreamWindowTask.java 3.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

package eu.stratosphere.streaming.api.streamcomponent;

18 19 20 21
import java.util.ArrayList;

import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.tuple.Tuple;
22
import eu.stratosphere.streaming.state.MutableTableState;
Y
Yingjun Wu 已提交
23
import eu.stratosphere.streaming.state.SlidingWindowState;
24
import eu.stratosphere.util.Collector;
25

26 27
public class StreamWindowTask extends FlatMapFunction<Tuple, Tuple> {
	private static final long serialVersionUID = 1L;
28

29
	private int computeGranularity;
30
	private int windowFieldId = 1;
31

32 33 34
	private ArrayList tempArrayList;
	private SlidingWindowState window;
	private MutableTableState<String, Integer> sum;
35 36 37 38 39 40 41
	private long initTimestamp = -1;
	private long nextTimestamp = -1;

	public StreamWindowTask(int windowSize, int slidingStep,
			int computeGranularity, int windowFieldId) {
		this.computeGranularity = computeGranularity;
		this.windowFieldId = windowFieldId;
42
		window = new SlidingWindowState(windowSize, slidingStep,
43
				computeGranularity);
44 45
		sum = new MutableTableState<String, Integer>();
		sum.put("sum", 0);
46 47
	}

48
	private void incrementCompute(ArrayList tupleArray) {}
49

50
	private void decrementCompute(ArrayList tupleArray) {}
51

52
	private void produceOutput(long progress, Collector out) {}
53 54

	@Override
55
	public void flatMap(Tuple value, Collector<Tuple> out) throws Exception {
M
Márton Balassi 已提交
56
		long progress = (Long) value.getField(windowFieldId);
57 58 59
		if (initTimestamp == -1) {
			initTimestamp = progress;
			nextTimestamp = initTimestamp + computeGranularity;
60
			tempArrayList = new ArrayList();
61 62 63
		} else {
			if (progress > nextTimestamp) {
				if (window.isFull()) {
64 65 66 67
					ArrayList expiredArrayList = window.popFront();
					incrementCompute(tempArrayList);
					decrementCompute(expiredArrayList);
					window.pushBack(tempArrayList);
68 69 70 71
					if (window.isEmittable()) {
						produceOutput(progress, out);
					}
				} else {
72 73
					incrementCompute(tempArrayList);
					window.pushBack(tempArrayList);
74
					if (window.isFull()) {
75
						produceOutput(progress, out);
76 77
					}
				}
78 79
				initTimestamp = nextTimestamp;
				nextTimestamp = initTimestamp + computeGranularity;
80
				tempArrayList = new ArrayList();
81
			}
82 83
			tempArrayList.add(value);
		}
84 85
	}
}