提交 960cecc9 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] Fail event added

上级 fecbe358
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
public class FailEvent extends AbstractTaskEvent {
private String recordId;
public FailEvent(String recordId) {
setRecordId(recordId);
System.out.println("Fail sent " + recordId);
System.out.println("---------------------");
}
@Override
public void write(DataOutput out) throws IOException {}
@Override
public void read(DataInput in) throws IOException {}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getRecordId() {
return this.recordId;
}
}
\ No newline at end of file
package eu.stratosphere.streaming.api;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
public class FailEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
public FailEventListener(String taskInstanceID,
FaultTolerancyBuffer recordBuffer) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
}
public void eventOccurred(AbstractTaskEvent event) {
FailEvent failEvent = (FailEvent) event;
String recordId = failEvent.getRecordId();
String failCID = recordId.split("-", 2)[0];
if (failCID.equals(taskInstanceID)) {
System.out.println("Fail recieved " + recordId);
recordBuffer.failRecord(recordId);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册