StreamWindowTask.java 3.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

package eu.stratosphere.streaming.api.streamcomponent;

import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
M
Márton Balassi 已提交
21
import eu.stratosphere.streaming.state.WindowState;
22 23

public class StreamWindowTask extends UserTaskInvokable {
24 25
	private static final long serialVersionUID = 1L;
	
26 27 28 29
	private int computeGranularity;
	private int windowFieldId = 1;

	private StreamRecord tempRecord;
M
Márton Balassi 已提交
30
	private WindowState<Integer> window;
31 32 33 34 35 36 37 38
	private MutableTableState<String, Integer> sum;
	private long initTimestamp = -1;
	private long nextTimestamp = -1;

	public StreamWindowTask(int windowSize, int slidingStep,
			int computeGranularity, int windowFieldId) {
		this.computeGranularity = computeGranularity;
		this.windowFieldId = windowFieldId;
M
Márton Balassi 已提交
39
		window = new WindowState<Integer>(windowSize, slidingStep,
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
				computeGranularity);
		sum = new MutableTableState<String, Integer>();
		sum.put("sum", 0);
	}

	private void incrementCompute(StreamRecord record){}
	private void decrementCompute(StreamRecord record){}
	private void produceRecord(long progress){}

	@Override
	public void invoke(StreamRecord record) throws Exception {
		int numTuple = record.getNumOfTuples();
		for (int i = 0; i < numTuple; ++i) {
			long progress = record.getLong(i, windowFieldId);
			if (initTimestamp == -1) {
				initTimestamp = progress;
				nextTimestamp = initTimestamp + computeGranularity;
				tempRecord = new StreamRecord(record.getNumOfFields());
			} else {
				if (progress > nextTimestamp) {
					if (window.isFull()) {
						StreamRecord expiredRecord = window.popFront();
						incrementCompute(tempRecord);
						decrementCompute(expiredRecord);
						window.pushBack(tempRecord);
						if (window.isEmittable()) {
							produceRecord(progress);
						}
					} else {
						incrementCompute(tempRecord);
						window.pushBack(tempRecord);
						if (window.isFull()) {
							produceRecord(progress);
						}
					}
					initTimestamp = nextTimestamp;
					nextTimestamp = initTimestamp + computeGranularity;
					tempRecord = new StreamRecord(record.getNumOfFields());
				}
				tempRecord.addTuple(record.getTuple(i));
			}
		}
	}
}