FailEventListener.java 2.5 KB
Newer Older
G
Gyula Fora 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

G
gyfora 已提交
16
package eu.stratosphere.streaming.faulttolerance;
17

18 19 20
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

21 22
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
23
import eu.stratosphere.streaming.api.streamrecord.UID;
24

G
Gyula Fora 已提交
25 26 27 28
/**
 * EventListener for record fail events. When a FailEvent occurs, uses the
 * task's fault tolerance buffer to fail and re-emit the given record.
 */
29 30
public class FailEventListener implements EventListener {

31 32
	private static final Log log = LogFactory.getLog(FailEventListener.class);

33
	private int taskInstanceID;
G
gyfora 已提交
34
	private FaultToleranceUtil recordBuffer;
35
	private int output;
36

G
Gyula Fora 已提交
37
	/**
38 39
	 * Creates a FailEventListener that monitors FailEvents sent to task with
	 * the given ID.
G
Gyula Fora 已提交
40
	 * 
41
	 * @param sourceInstanceID
42
	 *            ID of the task that creates the listener
G
Gyula Fora 已提交
43
	 * @param recordBuffer
44
	 *            The fault tolerance buffer associated with this task
45 46
	 * @param output
	 *            output channel
G
Gyula Fora 已提交
47
	 */
48 49
	public FailEventListener(int sourceInstanceID, FaultToleranceUtil recordBuffer, int output) {
		this.taskInstanceID = sourceInstanceID;
50
		this.recordBuffer = recordBuffer;
51
		this.output = output;
52 53
	}

G
Gyula Fora 已提交
54 55 56 57 58
	/**
	 * When a FailEvent occurs checks if it was directed at this task, if so,
	 * fails the record given in the FailEvent
	 * 
	 */
59 60
	public void eventOccurred(AbstractTaskEvent event) {
		FailEvent failEvent = (FailEvent) event;
61 62 63
		UID recordId = failEvent.getRecordId();
		int failCID = recordId.getChannelId();
		if (failCID == taskInstanceID) {
64
			recordBuffer.failRecord(recordId, output);
G
gyfora 已提交
65 66 67
			if (log.isWarnEnabled()) {
				log.warn("FAIL RECIEVED: " + output + " " + recordId);
			}
68 69 70 71
		}

	}
}