StreamWindowTask.java 3.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

package eu.stratosphere.streaming.api.streamcomponent;

M
Márton Balassi 已提交
18 19 20 21
import eu.stratosphere.streaming.api.StreamCollector;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
22
import eu.stratosphere.streaming.state.MutableTableState;
Y
Yingjun Wu 已提交
23
import eu.stratosphere.streaming.state.SlidingWindowState;
24

M
Márton Balassi 已提交
25
public class StreamWindowTask extends UserTaskInvokable {
26
	private static final long serialVersionUID = 1L;
M
Márton Balassi 已提交
27
	
28 29 30
	private int computeGranularity;
	private int windowFieldId = 1;

M
Márton Balassi 已提交
31 32
	private StreamRecord tempRecord;
	private SlidingWindowState<Integer> window;
33 34 35 36 37 38 39 40
	private MutableTableState<String, Integer> sum;
	private long initTimestamp = -1;
	private long nextTimestamp = -1;

	public StreamWindowTask(int windowSize, int slidingStep,
			int computeGranularity, int windowFieldId) {
		this.computeGranularity = computeGranularity;
		this.windowFieldId = windowFieldId;
M
Márton Balassi 已提交
41
		window = new SlidingWindowState<Integer>(windowSize, slidingStep,
42 43 44 45 46
				computeGranularity);
		sum = new MutableTableState<String, Integer>();
		sum.put("sum", 0);
	}

M
Márton Balassi 已提交
47 48 49
	private void incrementCompute(StreamRecord record){}
	private void decrementCompute(StreamRecord record){}
	private void produceRecord(long progress){}
50 51

	@Override
M
Márton Balassi 已提交
52 53 54 55 56 57 58 59 60 61 62
	public void invoke(StreamRecord record, StreamCollector collector) throws Exception {
		int numTuple = record.getBatchSize();
		int tupleIndex = 0;
		for (int i = 0; i < numTuple; ++i) {
			long progress = record.getTuple(i).getField(windowFieldId);
			if (initTimestamp == -1) {
				initTimestamp = progress;
				nextTimestamp = initTimestamp + computeGranularity;
				tempRecord = new ArrayStreamRecord(record.getBatchSize());
			} else {
				if (progress > nextTimestamp) {
63
					if (window.isFull()) {
M
Márton Balassi 已提交
64 65 66 67 68 69 70 71 72 73 74 75 76
						StreamRecord expiredRecord = window.popFront();
						incrementCompute(tempRecord);
						decrementCompute(expiredRecord);
						window.pushBack(tempRecord);
						if (window.isEmittable()) {
							produceRecord(progress);
						}
					} else {
						incrementCompute(tempRecord);
						window.pushBack(tempRecord);
						if (window.isFull()) {
							produceRecord(progress);
						}
77
					}
M
Márton Balassi 已提交
78 79 80
					initTimestamp = nextTimestamp;
					nextTimestamp = initTimestamp + computeGranularity;
					tempRecord = new ArrayStreamRecord(record.getBatchSize());
81
				}
M
Márton Balassi 已提交
82
				tempRecord.setTuple(tupleIndex++, record.getTuple(i));
83
			}
M
Márton Balassi 已提交
84
		}		
85 86
	}
}