提交 d6396081 编写于 作者: M manmat 提交者: Stephan Ewen

[streaming] SerializableStreamRecord added

上级 cd21bce6
language: java
jdk:
- oraclejdk8
- openjdk7
- openjdk6
script: "mvn -B clean verify"
......@@ -103,6 +103,20 @@
<argLine>-Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version>
<configuration>
<argLine>-Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
<!-- plugin that tests whether the code style is appropriate -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.6</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
......@@ -117,6 +131,28 @@
<downloadJavadocs>true</downloadJavadocs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce-maven</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireMavenVersion>
<version>(,2.1.0),(2.1.0,2.2.0),(2.2.0,)</version>
<message>Maven 2.1.0 and 2.2.0 produce incorrect GPG signatures and checksums respectively.</message>
</requireMavenVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
......@@ -158,6 +194,18 @@
<argLine>-Xmx1024m</argLine>
</configuration>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.7</version>
<configuration>
<systemPropertyVariables>
<log.level>WARN</log.level>
</systemPropertyVariables>
<forkMode>always</forkMode>
<threadCount>1</threadCount>
<perCoreThreadCount>false</perCoreThreadCount>
</configuration>
</plugin>
</plugins>
</build>
</project>
package eu.stratosphere.streaming.api;
import java.util.Map;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
public class AckEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
private Map<String, StreamRecord> recordBuffer;
public AckEventListener(String taskInstanceID,FaultTolerancyBuffer recordBuffer) {
public AckEventListener(String taskInstanceID,Map<String, StreamRecord> recordBuffer) {
this.taskInstanceID=taskInstanceID;
this.recordBuffer=recordBuffer;
}
......@@ -19,8 +21,8 @@ public class AckEventListener implements EventListener {
String ackCID = recordId.split("-", 2)[0];
if (ackCID.equals(taskInstanceID)) {
System.out.println("Ack recieved " + ackEvent.getRecordId());
recordBuffer.ackRecord(ackEvent.getRecordId());
System.out.println(recordBuffer.getRecordBuffer());
recordBuffer.remove(ackEvent.getRecordId());
System.out.println(recordBuffer);
System.out.println("---------------------");
}
......
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
public class FailEvent extends AbstractTaskEvent {
private String recordId;
public FailEvent(String recordId) {
setRecordId(recordId);
System.out.println("Fail sent " + recordId);
System.out.println("---------------------");
}
@Override
public void write(DataOutput out) throws IOException {}
@Override
public void read(DataInput in) throws IOException {}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getRecordId() {
return this.recordId;
}
}
\ No newline at end of file
package eu.stratosphere.streaming.api;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
public class FailEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
public FailEventListener(String taskInstanceID,
FaultTolerancyBuffer recordBuffer) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
}
public void eventOccurred(AbstractTaskEvent event) {
FailEvent failEvent = (FailEvent) event;
String recordId = failEvent.getRecordId();
String failCID = recordId.split("-", 2)[0];
if (failCID.equals(taskInstanceID)) {
System.out.println("Fail recieved " + recordId);
recordBuffer.failRecord(recordId);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
}
}
}
package eu.stratosphere.streaming.api;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
public class FaultTolerancyBuffer {
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private List<RecordWriter<Record>> outputs;
private String channelID;
private int numberOfOutputs;
public FaultTolerancyBuffer(List<RecordWriter<Record>> outputs, String channelID) {
this.outputs=outputs;
this.recordBuffer = new HashMap<String, StreamRecord>();
this.ackCounter = new HashMap<String, Integer>();
this.numberOfOutputs = outputs.size();
this.channelID=channelID;
}
public void addRecord(StreamRecord streamRecord) {
recordBuffer.put(streamRecord.getId(), streamRecord);
ackCounter.put(streamRecord.getId(), numberOfOutputs);
}
public Record popRecord(String recordID) {
Record record = recordBuffer.get(recordID).getRecord();
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
return record;
}
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
ackCounter.put(recordID, ackCount);
if (ackCount == 0) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
}
}
}
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
StreamRecord newRecord = new StreamRecord(popRecord(recordID),channelID).addId();
addRecord(newRecord);
reEmit(newRecord.getRecordWithId());
}
public void reEmit(Record record) {
for (RecordWriter<Record> output : outputs) {
try {
output.emit(record);
} catch (Exception e) {
System.out.println("Re-emit failed");
}
}
}
public Map<String, StreamRecord> getRecordBuffer() {
return this.recordBuffer;
}
}
......@@ -31,9 +31,6 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class SerializableStreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
private Value[] fields;
private StringValue uid = new StringValue("");
private String channelID = "";
private int numOfFields;
public SerializableStreamRecord(int length) {
this.numOfFields = length;
fields = new Value[length];
}
public SerializableStreamRecord(int length, String channelID) {
this(length);
this.channelID = channelID;
}
public int getNumOfFields() {
return numOfFields;
}
public SerializableStreamRecord setId() {
Random rnd = new Random();
uid.setValue(channelID + "-" + rnd.nextInt(1000));
return this;
}
public String getId() {
return uid.getValue();
}
public Value getField(int fieldNumber) {
return fields[fieldNumber];
}
public void setField(int fieldNumber, Value value) {
fields[fieldNumber] = value;
}
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
// Write the number of fields with an IntValue
(new IntValue(numOfFields)).write(out);
// Write the fields
for (int i = 0; i < numOfFields; i++) {
fields[i].write(out);
}
}
@Override
public void read(DataInput in) throws IOException {
uid.read(in);
// Get the number of fields
IntValue numOfFieldsValue = new IntValue(0);
numOfFieldsValue.read(in);
numOfFields = numOfFieldsValue.getValue();
// Make sure the fields have numOfFields elements
fields = new Value[numOfFields];
// Read the fields
for (int i = 0; i < numOfFields; i++) {
fields[i].read(in);
}
}
}
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.List;
import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.AckEventListener;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FailEventListener;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public final class StreamComponentHelper<T extends AbstractInvokable> {
public final class StreamComponentFactory {
public void setAckListener(FaultTolerancyBuffer recordBuffer,
public static void setAckListener(Map<String, StreamRecord> recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer);
......@@ -35,91 +26,68 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public void setFailListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, FailEvent.class);
// for StreamTask
public static int setConfigInputs(StreamTask taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(taskBase, Record.class));
}
return numberOfInputs;
}
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<RecordReader<Record>> inputs) throws Exception {
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
// for StreamSink
public static int setConfigInputs(StreamSink taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
inputs.add(new RecordReader<Record>((StreamTask) taskBase,
Record.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new RecordReader<Record>((StreamSink) taskBase,
Record.class));
} else {
throw new Exception(
"Nonsupported object passed to setConfigInputs");
}
inputs.add(new RecordReader<Record>(taskBase, Record.class));
}
return numberOfInputs;
}
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
//for StreamTask
public static int setConfigOutputs(StreamTask taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) throws Exception {
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners);
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<Record>((StreamTask) taskBase,
Record.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<Record>((StreamSource) taskBase,
Record.class, outputPartitioner));
} else {
throw new Exception(
"Nonsupported object passed to setConfigOutputs");
}
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
outputPartitioner));
}
return numberOfOutputs;
}
public UserSinkInvokable getUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
// for StreamSource
public static int setConfigOutputs(StreamSource taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
return userFunction;
}
public StreamInvokable getUserFunction(Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, String instanceID,
FaultTolerancyBuffer recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
StreamInvokable.class);
StreamInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, recordBuffer);
} catch (Exception e) {
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
outputPartitioner));
}
return userFunction;
return numberOfOutputs;
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<Record>> partitioners) {
public static void setPartitioner(Configuration taskConfiguration,
int nrOutput, List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput,
DefaultPartitioner.class, ChannelSelector.class);
......@@ -144,5 +112,4 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
System.out.println(e);
}
}
}
\ No newline at end of file
......@@ -13,7 +13,7 @@ public final class StreamRecord {
private String channelID = "";
public StreamRecord(Record record) {
this.record = record.createCopy();
this.record = record;
}
public StreamRecord(Record record, String channelID) {
......@@ -40,13 +40,7 @@ public final class StreamRecord {
}
public Record getRecord() {
Record newRecord=this.record.createCopy();
newRecord.removeField(newRecord.getNumFields() - 1);
return newRecord;
}
public Record getRecordWithId() {
return this.record;
return record;
}
// TODO:write proper toString
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
......@@ -21,8 +21,7 @@ import java.util.List;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record;
......@@ -30,24 +29,33 @@ public class StreamSink extends AbstractOutputTask {
private List<RecordReader<Record>> inputs;
private UserSinkInvokable userFunction;
private int numberOfInputs;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
userFunction = null;
numberOfInputs = 0;
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
StreamComponentHelper<StreamSink> streamSinkHelper = new StreamComponentHelper<StreamSink>();
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
try {
streamSinkHelper.setConfigInputs(this, taskConfiguration, inputs);
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
e.printStackTrace();
}
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
// setConfigInputs(taskConfiguration);
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
setUserFunction(taskConfiguration);
}
@Override
......@@ -59,7 +67,7 @@ public class StreamSink extends AbstractOutputTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord rec = new StreamRecord(input.next());
String id = rec.getId();
String id = rec.popId();
userFunction.invoke(rec.getRecord());
input.publishEvent(new AckEvent(id));
}
......
......@@ -13,16 +13,18 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Record;
......@@ -32,17 +34,21 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserSourceInvokable userFunction;
private int numberOfOutputs;
private static int numSources = 0;
private String sourceInstanceID;
private FaultTolerancyBuffer recordBuffer;
private Map<String, StreamRecord> recordBuffer;
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfOutputs = 0;
numSources++;
sourceInstanceID = Integer.toString(numSources);
recordBuffer = new TreeMap<String, StreamRecord>();
}
......@@ -56,25 +62,26 @@ public class StreamSource extends AbstractInputTask<RandIS> {
return null;
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
StreamComponentHelper<StreamSource> streamSourceHelper = new StreamComponentHelper<StreamSource>();
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSourceInvokable.class,
UserSourceInvokable.class);
try {
streamSourceHelper.setConfigOutputs(this, taskConfiguration,
outputs, partitioners);
userFunction = userFunctionClass.newInstance();
userFunction
.declareOutputs(outputs, sourceInstanceID, recordBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
recordBuffer = new FaultTolerancyBuffer(outputs, sourceInstanceID);
userFunction = (UserSourceInvokable) streamSourceHelper
.getUserFunction(taskConfiguration, outputs, sourceInstanceID,
recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID,
outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID,
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID,
outputs);
}
......@@ -84,7 +91,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
userFunction.invoke();
System.out.println(this.getClass().getName() + "-" + sourceInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println(recordBuffer.toString());
System.out.println("---------------------");
}
......
......@@ -13,33 +13,35 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
//TODO: Refactor, create common ancestor with StreamSource
public class StreamTask extends AbstractTask {
private List<RecordReader<Record>> inputs;
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserTaskInvokable userFunction;
private int numberOfInputs;
private int numberOfOutputs;
private static int numTasks = 0;
private String taskInstanceID = "";
private FaultTolerancyBuffer recordBuffer;
private Map<String, StreamRecord> recordBuffer;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -47,31 +49,36 @@ public class StreamTask extends AbstractTask {
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfInputs = 0;
numberOfOutputs = 0;
numTasks++;
taskInstanceID = Integer.toString(numTasks);
recordBuffer = new TreeMap<String, StreamRecord>();
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
UserTaskInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, taskInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
StreamComponentHelper<StreamTask> streamTaskHelper = new StreamComponentHelper<StreamTask>();
try {
streamTaskHelper.setConfigInputs(this, taskConfiguration, inputs);
streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
} catch (Exception e) {
e.printStackTrace();
}
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID);
userFunction = (UserTaskInvokable) streamTaskHelper
.getUserFunction(taskConfiguration, outputs, taskInstanceID,
recordBuffer);
streamTaskHelper.setAckListener(recordBuffer, taskInstanceID,
outputs);
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID,
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID,
outputs);
}
......@@ -84,20 +91,15 @@ public class StreamTask extends AbstractTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
String id = streamRecord.getId();
String id = streamRecord.popId();
// TODO: Enclose invoke in try-catch to properly fail
// records
try {
userFunction.invoke(streamRecord.getRecord());
System.out.println(this.getClass().getName() + "-"
+ taskInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
} catch (Exception e) {
input.publishEvent(new FailEvent(id));
}
userFunction.invoke(streamRecord.getRecord());
System.out.println(this.getClass().getName() + "-"
+ taskInstanceID);
System.out.println(recordBuffer.toString());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
}
}
}
......
package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.Record;
......@@ -11,10 +12,10 @@ public abstract class StreamInvokable {
private List<RecordWriter<Record>> outputs;
protected String channelID;
private FaultTolerancyBuffer emittedRecords;
private Map<String, StreamRecord> emittedRecords;
public final void declareOutputs(List<RecordWriter<Record>> outputs,
String channelID, FaultTolerancyBuffer emittedRecords) {
String channelID, Map<String, StreamRecord> emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
......@@ -23,12 +24,12 @@ public abstract class StreamInvokable {
public final void emit(Record record) {
StreamRecord streamRecord = new StreamRecord(record, channelID).addId();
emittedRecords.addRecord(streamRecord);
emittedRecords.put(streamRecord.getId(), streamRecord);
for (RecordWriter<Record> output : outputs) {
try {
output.emit(streamRecord.getRecordWithId());
output.emit(streamRecord.getRecord());
System.out.println(this.getClass().getName());
System.out.println("Emitted " + streamRecord.getId() + "-"
......@@ -37,7 +38,6 @@ public abstract class StreamInvokable {
} catch (Exception e) {
System.out.println("Emit error");
emittedRecords.failRecord(streamRecord.getId());
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册