FaultToleranceBuffer.java 5.0 KB
Newer Older
G
gyfora 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

package eu.stratosphere.streaming.faulttolerance;

import java.util.HashSet;
G
gyfora 已提交
19 20
import java.util.LinkedList;
import java.util.List;
G
gyfora 已提交
21 22 23
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
24 25
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
G
gyfora 已提交
26 27 28 29 30

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
31
import eu.stratosphere.streaming.api.streamrecord.UID;
G
gyfora 已提交
32 33 34

public abstract class FaultToleranceBuffer {

35
	private static final Log log = LogFactory.getLog(FaultToleranceBuffer.class);
G
gyfora 已提交
36

37 38 39
	protected Map<UID, StreamRecord> recordBuffer;
	protected Map<UID, Long> recordTimestamps;
	protected SortedMap<Long, Set<UID>> recordsByTime;
G
gyfora 已提交
40 41 42 43

	protected int[] numberOfEffectiveChannels;
	protected int totalNumberOfEffectiveChannels;
	protected Long timeOfLastUpdate;
44
	protected int componentInstanceID;
G
gyfora 已提交
45

G
gyfora 已提交
46 47
	long timeout = 30000;

48
	public FaultToleranceBuffer(int[] numberOfChannels, int componentInstanceID) {
G
gyfora 已提交
49 50 51 52 53 54
		this.numberOfEffectiveChannels = numberOfChannels;
		totalNumberOfEffectiveChannels = 0;
		for (int i : numberOfChannels) {
			totalNumberOfEffectiveChannels += i;
		}

55
		this.componentInstanceID = componentInstanceID;
G
gyfora 已提交
56 57
		this.timeOfLastUpdate = System.currentTimeMillis();

58 59 60
		this.recordBuffer = new ConcurrentHashMap<UID, StreamRecord>();
		this.recordsByTime = new ConcurrentSkipListMap<Long, Set<UID>>();
		this.recordTimestamps = new ConcurrentHashMap<UID, Long>();
G
gyfora 已提交
61 62
	}

63
	public void add(StreamRecord streamRecord) {
G
gyfora 已提交
64 65

		StreamRecord record = streamRecord.copy();
66
		UID id = record.getId();
67

G
gyfora 已提交
68 69
		recordBuffer.put(id, record);

G
gyfora 已提交
70
		addTimestamp(id);
G
gyfora 已提交
71 72
		addToAckCounter(id);

G
gyfora 已提交
73 74 75
		if (log.isTraceEnabled()) {
			log.trace("Record added to buffer: " + id);
		}
76 77
	}

78
	public void add(StreamRecord streamRecord, int channel) {
79 80 81

		StreamRecord record = streamRecord.copy();

82
		UID id = record.getId();
83 84 85 86
		recordBuffer.put(id, record);
		addTimestamp(id);

		addToAckCounter(id, channel);
G
gyfora 已提交
87

G
gyfora 已提交
88 89 90
		if (log.isTraceEnabled()) {
			log.trace("Record added to buffer: " + id);
		}
G
gyfora 已提交
91 92
	}

93
	protected abstract void addToAckCounter(UID id);
G
gyfora 已提交
94

95
	protected void addToAckCounter(UID id, int channel) {
96 97 98
		addToAckCounter(id);
	}

99
	protected abstract boolean removeFromAckCounter(UID uid);
100

101
	protected abstract void ack(UID id, int channel);
102

G
gyfora 已提交
103
	// TODO:count fails
104 105 106
	protected StreamRecord fail(UID uid) {
		if (recordBuffer.containsKey(uid)) {
			StreamRecord newRecord = remove(uid).setId(componentInstanceID);
G
gyfora 已提交
107 108 109 110 111
			add(newRecord);
			return newRecord;
		} else {
			return null;
		}
G
gyfora 已提交
112
	}
113

114
	protected abstract StreamRecord failChannel(UID id, int channel);
G
gyfora 已提交
115

116
	protected void addTimestamp(UID id) {
G
gyfora 已提交
117 118
		Long currentTime = System.currentTimeMillis();

G
gyfora 已提交
119
		recordTimestamps.put(id, currentTime);
120

121
		Set<UID> recordSet = recordsByTime.get(currentTime);
G
gyfora 已提交
122 123

		if (recordSet == null) {
124
			recordSet = new HashSet<UID>();
G
gyfora 已提交
125 126 127 128
			recordsByTime.put(currentTime, recordSet);
		}

		recordSet.add(id);
G
gyfora 已提交
129

G
gyfora 已提交
130 131
	}

132
	public StreamRecord remove(UID uid) {
G
gyfora 已提交
133

134
		if (removeFromAckCounter(uid)) {
135

136
			recordsByTime.get(recordTimestamps.remove(uid)).remove(uid);
137

G
gyfora 已提交
138 139 140
			if (log.isTraceEnabled()) {
				log.trace("Record removed from buffer: " + uid);
			}
141
			return recordBuffer.remove(uid);
G
gyfora 已提交
142
		} else {
G
gyfora 已提交
143 144 145
			if (log.isWarnEnabled()) {
				log.warn("Record ALREADY REMOVED from buffer: " + uid);
			}
G
gyfora 已提交
146 147 148 149 150
			return null;
		}

	}

G
gyfora 已提交
151
	// TODO:test this
152
	public List<UID> timeoutRecords(Long currentTime) {
G
gyfora 已提交
153
		if (timeOfLastUpdate + timeout < currentTime) {
G
gyfora 已提交
154 155 156
			if (log.isTraceEnabled()) {
				log.trace("Updating record buffer");
			}
157 158
			List<UID> timedOutRecords = new LinkedList<UID>();
			Map<Long, Set<UID>> timedOut = recordsByTime.subMap(0L, currentTime - timeout);
G
gyfora 已提交
159

160
			for (Set<UID> recordSet : timedOut.values()) {
G
gyfora 已提交
161
				if (!recordSet.isEmpty()) {
162
					for (UID recordID : recordSet) {
G
gyfora 已提交
163 164 165 166 167
						timedOutRecords.add(recordID);
					}
				}
			}

168
			for (UID recordID : timedOutRecords) {
G
gyfora 已提交
169 170 171 172 173 174 175 176 177 178 179
				fail(recordID);
			}

			timedOut.clear();

			timeOfLastUpdate = currentTime;
			return timedOutRecords;
		}
		return null;
	}

G
gyfora 已提交
180
}