提交 2a6e93dd 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] improved threadsafe publishing

上级 7bff50f1
......@@ -8,24 +8,28 @@ import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
public class AckEvent extends AbstractTaskEvent {
private String recordId;
public AckEvent(String recordId) {
setRecordId(recordId);
System.out.println("Ack sent " + recordId);
System.out.println("---------------------");
}
@Override
public void write(DataOutput out) throws IOException {}
public void write(DataOutput out) throws IOException {
}
@Override
public void read(DataInput in) throws IOException {}
public void read(DataInput in) throws IOException {
}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getRecordId() {
return this.recordId;
}
}
\ No newline at end of file
......@@ -8,21 +8,22 @@ public class AckEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
public AckEventListener(String taskInstanceID,FaultTolerancyBuffer recordBuffer) {
this.taskInstanceID=taskInstanceID;
this.recordBuffer=recordBuffer;
public AckEventListener(String taskInstanceID,
FaultTolerancyBuffer recordBuffer) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
}
public void eventOccurred(AbstractTaskEvent event) {
AckEvent ackEvent = (AckEvent) event;
String recordId = ackEvent.getRecordId();
String ackCID = recordId.split("-", 2)[0];
if (ackCID.equals(taskInstanceID)) {
System.out.println("Ack recieved " + ackEvent.getRecordId());
recordBuffer.ackRecord(ackEvent.getRecordId());
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
System.out.println("--------------");
}
}
......
......@@ -10,24 +10,23 @@ import java.util.SortedMap;
import java.util.TreeMap;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
public class FaultTolerancyBuffer {
private final static long TIMEOUT = 1000;
private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private SortedMap<Long, Set<String>> recordsByTime;
private Map<String, Long> recordTimestamps;
private List<RecordWriter<Record>> outputs;
private List<RecordWriter<StreamRecord>> outputs;
private String channelID;
private int numberOfOutputs;
public FaultTolerancyBuffer(List<RecordWriter<Record>> outputs,
public FaultTolerancyBuffer(List<RecordWriter<StreamRecord>> outputs,
String channelID) {
this.timeOfLastUpdate = System.currentTimeMillis();
this.outputs = outputs;
......@@ -36,7 +35,7 @@ public class FaultTolerancyBuffer {
this.numberOfOutputs = outputs.size();
this.channelID = channelID;
this.recordsByTime = new TreeMap<Long, Set<String>>();
this.recordTimestamps=new HashMap<String, Long>();
this.recordTimestamps = new HashMap<String, Long>();
}
public void addRecord(StreamRecord streamRecord) {
......@@ -45,15 +44,16 @@ public class FaultTolerancyBuffer {
ackCounter.put(streamRecord.getId(), numberOfOutputs);
addTimestamp(streamRecord.getId());
}
//TODO: use this method!
// TODO: use this method!
private void timeoutRecords() {
Long currentTime = System.currentTimeMillis();
if (timeOfLastUpdate + TIMEOUT < currentTime) {
List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime - TIMEOUT);
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime
- TIMEOUT);
for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
for (String recordID : recordSet) {
......@@ -61,32 +61,31 @@ public class FaultTolerancyBuffer {
}
}
}
recordsByTime.keySet().removeAll(timedOut.keySet());
for (String recordID: timedOutRecords) {
for (String recordID : timedOutRecords) {
failRecord(recordID);
}
}
}
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(recordID,currentTime);
recordTimestamps.put(recordID, currentTime);
if (recordsByTime.containsKey(currentTime)) {
recordsByTime.get(currentTime).add(recordID);
} else {
Set<String> recordSet = new HashSet<String>();
recordSet.add(recordID);
recordsByTime.put(currentTime,recordSet);
recordsByTime.put(currentTime, recordSet);
}
System.out.println(currentTime.toString()+" : "+recordsByTime.get(currentTime).toString());
// System.out.println(currentTime.toString()+" : "+recordsByTime.get(currentTime).toString());
}
public Record popRecord(String recordID) {
System.out.println("Pop ID: "+recordID);
Record record = recordBuffer.get(recordID)
.getRecord();
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
StreamRecord record = recordBuffer.get(recordID);
removeRecord(recordID);
return record;
}
......@@ -94,9 +93,18 @@ public class FaultTolerancyBuffer {
private void removeRecord(String recordID) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
try {
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(
ts)
.remove(recordID); }
catch(Exception e){
System.out.println(e.getMessage());
System.out.println(recordID);
}
}
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
......@@ -113,18 +121,17 @@ public class FaultTolerancyBuffer {
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
System.out.println("Fail ID: "+recordID);
StreamRecord newRecord = new StreamRecord(popRecord(recordID), channelID)
.addId();
addRecord(newRecord);
reEmit(newRecord.getRecordWithId());
System.out.println("Fail ID: " + recordID);
StreamRecord newRecord = popRecord(recordID)
.setId(channelID);
reEmit(newRecord);
}
public void reEmit(Record record) {
for (RecordWriter<Record> output : outputs) {
public void reEmit(StreamRecord record) {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(record);
System.out.println("Re-emitted");
} catch (Exception e) {
System.out.println("Re-emit failed");
}
......
......@@ -39,7 +39,6 @@ import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
import eu.stratosphere.streaming.partitioner.ShufflePartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
public class JobGraphBuilder {
......@@ -89,7 +88,7 @@ public class JobGraphBuilder {
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<Record>> PartitionerClass,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
......
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
import eu.stratosphere.types.Record;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
//TODO: refactor access modifiers
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
public final class StreamRecord {
private Record record;
private Value[] fields;
private StringValue uid = new StringValue("");
private String channelID = "";
private int numOfFields;
public StreamRecord(Record record) {
this.record = record.createCopy();
public StreamRecord() {
this.numOfFields = 1;
fields = new Value[1];
//setId();
}
public StreamRecord(Record record, String channelID) {
this.record = record.createCopy();
this.channelID = channelID;
public StreamRecord(int length) {
this.numOfFields = length;
fields = new Value[length];
// setId();
}
public StreamRecord(int length, String channelID) {
this(length);
setChannelId(channelID);
}
public StreamRecord(Value value) {
this(1);
fields[0] = value;
}
public int getNumOfFields() {
return numOfFields;
}
public StreamRecord addId() {
public StreamRecord setId(String channelID) {
Random rnd = new Random();
uid.setValue(channelID + "-" + rnd.nextInt(1000));
record.addField(uid);
uid.setValue(channelID + "-" + rnd.nextInt());
return this;
}
public String popId() {
record.getFieldInto(record.getNumFields() - 1, uid);
record.removeField(record.getNumFields() - 1);
public String getId() {
return uid.getValue();
}
public String getId() {
record.getFieldInto(record.getNumFields() - 1, uid);
return uid.getValue();
public Value getField(int fieldNumber) {
return fields[fieldNumber];
}
public Record getRecord() {
Record newRecord=this.record.createCopy();
newRecord.removeField(newRecord.getNumFields() - 1);
return newRecord;
// public void getFieldInto(int fieldNumber, Value value) {
// value = fields[fieldNumber];
// }
public void setField(int fieldNumber, Value value) {
fields[fieldNumber] = value;
}
public Record getRecordWithId() {
return this.record;
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
// Write the number of fields with an IntValue
(new IntValue(numOfFields)).write(out);
// Write the fields
for (int i = 0; i < numOfFields; i++) {
(new StringValue(fields[i].getClass().getName())).write(out);
fields[i].write(out);
}
}
// TODO:write proper toString
@Override
public void read(DataInput in) throws IOException {
uid.read(in);
// Get the number of fields
IntValue numOfFieldsValue = new IntValue(0);
numOfFieldsValue.read(in);
numOfFields = numOfFieldsValue.getValue();
// Make sure the fields have numOfFields elements
fields = new Value[numOfFields];
// Read the fields
for (int i = 0; i < numOfFields; i++) {
StringValue stringValue = new StringValue("");
stringValue.read(in);
try {
fields[i] = (Value) Class.forName(stringValue.getValue()).newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
fields[i].read(in);
}
}
public StreamRecord newInstance() {
return new StreamRecord(0);
}
public StreamRecord setChannelId(String channelID) {
this.channelID = channelID;
return this;
}
// TODO: fix this method to work properly for non StringValue types
public String toString() {
StringBuilder outputString = new StringBuilder();
StringValue output = new StringValue("");
StringValue output; // = new StringValue("");
for (int i = 0; i < this.getNumOfFields(); i++) {
try {
output = (StringValue) fields[i];
outputString.append(output.getValue() + "*");
} catch (ClassCastException e) {
outputString.append("PRINT_ERROR*");
}
for (int i = 0; i < this.record.getNumFields(); i++) {
this.record.getFieldInto(i, output);
outputString.append(output.getValue() + "*");
}
return outputString.toString();
}
}
......@@ -15,15 +15,14 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(Record record) throws Exception {
StringValue value = new StringValue("");
record.getFieldInto(0, value);
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0);
System.out.println(value.getValue());
}
......
......@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.StringValue;
public class DefaultSourceInvokable extends UserSourceInvokable {
......@@ -26,7 +26,7 @@ public class DefaultSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (CharSequence word : mottoArray) {
emit(new Record(new StringValue(word)));
emit(new StreamRecord(new StringValue(word)));
}
}
......
......@@ -15,12 +15,12 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
public class DefaultTaskInvokable extends UserTaskInvokable {
@Override
public void invoke(Record record) throws Exception {
public void invoke(StreamRecord record) throws Exception {
emit(record);
}
}
\ No newline at end of file
package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.Record;
public abstract class StreamInvokable {
private List<RecordWriter<Record>> outputs;
private List<RecordWriter<StreamRecord>> outputs;
protected String channelID;
private FaultTolerancyBuffer emittedRecords;
public final void declareOutputs(List<RecordWriter<Record>> outputs,
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs,
String channelID, FaultTolerancyBuffer emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
}
public final void emit(Record record) {
public final void emit(StreamRecord record) {
StreamRecord streamRecord = new StreamRecord(record, channelID).addId();
emittedRecords.addRecord(streamRecord);
record.setId(channelID);
emittedRecords.addRecord(record);
for (RecordWriter<Record> output : outputs) {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(streamRecord.getRecordWithId());
output.emit(record);
System.out.println(this.getClass().getName());
System.out.println("Emitted " + streamRecord.getId() + "-"
+ streamRecord.toString());
System.out.println("Emitted " + record.getId() + "-"
+ record.toString());
System.out.println("---------------------");
} catch (Exception e) {
System.out.println("Emit error");
emittedRecords.failRecord(streamRecord.getId());
System.out.println("Emit error: " + e.getMessage());
emittedRecords.failRecord(record.getId());
}
}
}
......
......@@ -15,8 +15,8 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
public interface UserSinkInvokable {
public void invoke(Record record) throws Exception;
public void invoke(StreamRecord record) throws Exception;
}
\ No newline at end of file
......@@ -15,13 +15,11 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
public abstract class UserTaskInvokable extends StreamInvokable {
public void invoke(Record record) throws Exception {
public void invoke(StreamRecord record) throws Exception {
}
}
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.IOException;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Random;
......@@ -15,6 +17,7 @@ import eu.stratosphere.streaming.api.AckEventListener;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FailEventListener;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamInvokable;
......@@ -22,67 +25,63 @@ import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public final class StreamComponentHelper<T extends AbstractInvokable> {
private Random random = new Random();
public void setAckListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
String sourceInstanceID, List<RecordWriter<StreamRecord>> outputs) {
EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
for (RecordWriter<StreamRecord> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, AckEvent.class);
}
}
public void setFailListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
String sourceInstanceID, List<RecordWriter<StreamRecord>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
for (RecordWriter<StreamRecord> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, FailEvent.class);
}
}
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<RecordReader<Record>> inputs) throws Exception {
List<RecordReader<StreamRecord>> inputs) throws Exception {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
inputs.add(new RecordReader<Record>((StreamTask) taskBase,
Record.class));
inputs.add(new RecordReader<StreamRecord>((StreamTask) taskBase,
StreamRecord.class));
} else if (taskBase instanceof StreamSink) {
inputs.add(new RecordReader<Record>((StreamSink) taskBase,
Record.class));
inputs.add(new RecordReader<StreamRecord>((StreamSink) taskBase,
StreamRecord.class));
} else {
throw new Exception(
"Nonsupported object passed to setConfigInputs");
throw new Exception("Nonsupported object passed to setConfigInputs");
}
}
}
public void setConfigOutputs(T taskBase, Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) throws Exception {
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws Exception {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
for (ChannelSelector<StreamRecord> outputPartitioner : partitioners) {
if (taskBase instanceof StreamTask) {
outputs.add(new RecordWriter<Record>((StreamTask) taskBase,
Record.class, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamTask) taskBase,
StreamRecord.class, outputPartitioner));
} else if (taskBase instanceof StreamSource) {
outputs.add(new RecordWriter<Record>((StreamSource) taskBase,
Record.class, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>((StreamSource) taskBase,
StreamRecord.class, outputPartitioner));
} else {
throw new Exception(
"Nonsupported object passed to setConfigOutputs");
throw new Exception("Nonsupported object passed to setConfigOutputs");
}
}
}
......@@ -103,7 +102,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public StreamInvokable getUserFunction(Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, String instanceID,
List<RecordWriter<StreamRecord>> outputs, String instanceID,
FaultTolerancyBuffer recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
......@@ -121,42 +120,43 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
return userFunction;
}
//TODO: use TCP-like waiting
public void threadSafePublish (AbstractTaskEvent e, RecordReader<Record> input) throws InterruptedException {
// TODO: use TCP-like waiting
public void threadSafePublish(AbstractTaskEvent event,
RecordReader<StreamRecord> input) throws InterruptedException,
IOException {
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
input.publishEvent(e);
input.publishEvent(event);
concurrentModificationOccured = true;
} catch (Exception exeption) {
Thread.sleep(random.nextInt(50));
} catch (ConcurrentModificationException exeption) {
System.out.println("waiting...");
}
}
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput,
List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput,
DefaultPartitioner.class, ChannelSelector.class);
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger(
"partitionerIntParam_" + nrOutput, 1);
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class,
Key.class);
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
partitioners.add(partitioner.getConstructor(int.class,
Class.class).newInstance(keyPosition, keyClass));
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
} catch (Exception e) {
System.out.println("partitioner error" + " " + "partitioner_"
+ nrOutput);
System.out.println("partitioner error" + " " + "partitioner_" + nrOutput);
System.out.println(e);
}
}
......
......@@ -25,17 +25,16 @@ import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record;
public class StreamSink extends AbstractOutputTask {
private List<RecordReader<Record>> inputs;
private List<RecordReader<StreamRecord>> inputs;
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
inputs = new LinkedList<RecordReader<StreamRecord>>();
userFunction = null;
streamSinkHelper = new StreamComponentHelper<StreamSink>();
}
......@@ -57,13 +56,13 @@ public class StreamSink extends AbstractOutputTask {
boolean hasInput = true;
while (hasInput) {
hasInput = false;
for (RecordReader<Record> input : inputs) {
for (RecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
StreamRecord rec = new StreamRecord(input.next());
StreamRecord rec = input.next();
String id = rec.getId();
try {
userFunction.invoke(rec.getRecord());
userFunction.invoke(rec);
streamSinkHelper.threadSafePublish(new AckEvent(id), input);
} catch (Exception e) {
streamSinkHelper.threadSafePublish(new FailEvent(id), input);
......
......@@ -23,14 +23,14 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Record;
public class StreamSource extends AbstractInputTask<RandIS> {
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners;
private UserSourceInvokable userFunction;
private static int numSources = 0;
private String sourceInstanceID;
......@@ -38,8 +38,8 @@ public class StreamSource extends AbstractInputTask<RandIS> {
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
numSources++;
sourceInstanceID = Integer.toString(numSources);
......@@ -62,20 +62,17 @@ public class StreamSource extends AbstractInputTask<RandIS> {
StreamComponentHelper<StreamSource> streamSourceHelper = new StreamComponentHelper<StreamSource>();
try {
streamSourceHelper.setConfigOutputs(this, taskConfiguration,
outputs, partitioners);
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
} catch (Exception e) {
e.printStackTrace();
}
recordBuffer = new FaultTolerancyBuffer(outputs, sourceInstanceID);
userFunction = (UserSourceInvokable) streamSourceHelper
.getUserFunction(taskConfiguration, outputs, sourceInstanceID,
recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID,
outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID,
outputs);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(
taskConfiguration, outputs, sourceInstanceID, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
}
......@@ -84,7 +81,6 @@ public class StreamSource extends AbstractInputTask<RandIS> {
userFunction.invoke();
System.out.println(this.getClass().getName() + "-" + sourceInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
}
......
......@@ -28,13 +28,12 @@ import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
public class StreamTask extends AbstractTask {
private List<RecordReader<Record>> inputs;
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private List<RecordReader<StreamRecord>> inputs;
private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners;
private UserTaskInvokable userFunction;
private static int numTasks = 0;
private String taskInstanceID = "";
......@@ -44,9 +43,9 @@ public class StreamTask extends AbstractTask {
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
inputs = new LinkedList<RecordReader<StreamRecord>>();
outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
numTasks++;
taskInstanceID = Integer.toString(numTasks);
......@@ -77,14 +76,14 @@ public class StreamTask extends AbstractTask {
boolean hasInput = true;
while (hasInput) {
hasInput = false;
for (RecordReader<Record> input : inputs) {
for (RecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
StreamRecord streamRecord = input.next();
String id = streamRecord.getId();
// TODO create method for concurrent publishing
try {
userFunction.invoke(streamRecord.getRecord());
userFunction.invoke(streamRecord);
streamTaskHelper.threadSafePublish(new AckEvent(id), input);
} catch (Exception e) {
streamTaskHelper.threadSafePublish(new FailEvent(id), input);
......
......@@ -16,12 +16,12 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
public class BroadcastPartitioner implements ChannelSelector<Record> {
public class BroadcastPartitioner implements ChannelSelector<StreamRecord> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
int[] returnChannels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
......
......@@ -16,12 +16,12 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
public class DefaultPartitioner implements ChannelSelector<Record> {
public class DefaultPartitioner implements ChannelSelector<StreamRecord> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
return new BroadcastPartitioner().selectChannels(record, numberOfOutputChannels);
......
......@@ -16,11 +16,11 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
//Grouping by a key
public class FieldsPartitioner implements ChannelSelector<Record> {
public class FieldsPartitioner implements ChannelSelector<StreamRecord> {
private int keyPosition;
private Class<? extends Key> keyClass;
......@@ -32,7 +32,7 @@ public class FieldsPartitioner implements ChannelSelector<Record> {
}
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
Key key = null;
try {
key = keyClass.newInstance();
......@@ -41,7 +41,7 @@ public class FieldsPartitioner implements ChannelSelector<Record> {
} catch (IllegalAccessException e) {
e.printStackTrace();
}
record.getFieldInto(keyPosition, key);
key = keyClass.cast(record.getField(keyPosition));
return new int[] { Math.abs(key.hashCode()) % numberOfOutputChannels };
}
}
......@@ -16,13 +16,13 @@
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
//Randomly group, to distribute equally
public class GlobalPartitioner implements ChannelSelector<Record> {
public class GlobalPartitioner implements ChannelSelector<StreamRecord> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
return new int[]{0};
}
}
......@@ -18,13 +18,13 @@ package eu.stratosphere.streaming.partitioner;
import java.util.Random;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
import eu.stratosphere.streaming.api.StreamRecord;
//Randomly group, to distribute equally
public class ShufflePartitioner implements ChannelSelector<Record> {
public class ShufflePartitioner implements ChannelSelector<StreamRecord> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
Random random = new Random();
return new int[]{random.nextInt(numberOfOutputChannels)};
}
......
......@@ -15,22 +15,22 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
public class QuerySourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 5; i++) {
Record record1 = new Record(3);
StreamRecord record1 = new StreamRecord(3);
record1.setField(0, new IntValue(5));
record1.setField(1, new LongValue(510));
record1.setField(2, new LongValue(100));
Record record2 = new Record(3);
StreamRecord record2 = new StreamRecord(3);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(510));
record2.setField(2, new LongValue(100));
......
......@@ -15,16 +15,15 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class TestSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(Record record) throws Exception {
StringValue value = new StringValue("");
record.getFieldInto(0, value);
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0);
System.out.println(value.getValue());
}
......
......@@ -15,20 +15,20 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
public class TestSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 10; i++) {
Record record1 = new Record(2);
StreamRecord record1 = new StreamRecord(2);
record1.setField(0, new IntValue(5));
record1.setField(1, new LongValue(500));
Record record2 = new Record(2);
StreamRecord record2 = new StreamRecord(2);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(500));
emit(record1);
......
......@@ -15,11 +15,11 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class TestTaskInvokable extends UserTaskInvokable {
......@@ -27,22 +27,19 @@ public class TestTaskInvokable extends UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override
public void invoke(Record record) throws Exception {
IntValue value1 = new IntValue(0);
record.getFieldInto(0, value1);
LongValue value2 = new LongValue(0);
record.getFieldInto(1, value2);
public void invoke(StreamRecord record) throws Exception {
IntValue value1 = (IntValue) record.getField(0);
LongValue value2 = (LongValue) record.getField(1);
// INFO
if (record.getNumFields() == 2) {
if (record.getNumOfFields() == 2) {
engine.put(value1.getValue(), value2.getValue());
emit(new Record(new StringValue(value1 + " " + value2)));
emit(new StreamRecord(new StringValue(value1 + " " + value2)));
}
// QUERY
else if (record.getNumFields() == 3) {
LongValue value3 = new LongValue(0);
record.getFieldInto(2, value3);
emit(new Record(new StringValue(String.valueOf(engine.get(
else if (record.getNumOfFields() == 3) {
LongValue value3 = (LongValue) record.getField(2);
emit(new StreamRecord(new StringValue(String.valueOf(engine.get(
value2.getValue(), value3.getValue(), value1.getValue())))));
}
}
......
package eu.stratosphere.streaming.test.window.wordcount;
public class IncrementalWindow {
private int currentTupleNum;
private int fullTupleNum;
private int slideTupleNum;
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue;
public class WindowWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink");
return graphBuilder.getJobGraph();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private Record outputRecord = new Record(3);
private int count = 1;
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, wordValue);
word = wordValue.getValue();
record.getFieldInto(1, timestamp);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
} else {
wordCounts.put(word, 1);
countValue.setValue(1);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
outputRecord.setField(2, timestamp);
emit(outputRecord);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WindowWordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private LongValue timestamp = new LongValue(0);
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, word);
record.getFieldInto(1, count);
record.getFieldInto(2, timestamp);
System.out.println("============================================");
System.out.println(word.getValue() + " " + count.getValue() + " "
+ timestamp.getValue());
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WindowWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line;
private long timestamp;
private Record hamletRecord = null;
public WindowWordCountSource() {
try {
br = new BufferedReader(
new FileReader(
"src/main/java/eu/stratosphere/streaming/test/window/wordcount/hamlet.txt"));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
timestamp = 0;
// while (line != null) {
for (int i = 0; i < 2; ++i) {
if (line != "") {
hamletRecord = new Record(new StringValue(line), new LongValue(timestamp));
System.out.println("========line number: " + timestamp + ", "
+ line + "==========");
emit(hamletRecord);
line = br.readLine();
++timestamp;
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WindowWordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private LongValue timestamp = new LongValue(0);
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private Record outputRecord = new Record(wordValue, timestamp);
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, sentence);
record.getFieldInto(1, timestamp);
System.out.println("************sentence=" + sentence.getValue() + ", timestamp="
+ timestamp.getValue()+"************");
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, timestamp);
emit(outputRecord);
}
}
}
\ No newline at end of file
......@@ -17,7 +17,6 @@ package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue;
......@@ -27,8 +26,8 @@ public class WordCount extends TestBase2 {
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 1);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter");
......
......@@ -17,10 +17,11 @@ package eu.stratosphere.streaming.test.wordcount;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountCounter extends UserTaskInvokable {
......@@ -29,25 +30,28 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private String word = "";
private Record outputRecord = new Record(2);
private StreamRecord outputRecord = new StreamRecord(2);
private int count = 1;
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, wordValue);
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getField(0);
word = wordValue.getValue();
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
} else {
wordCounts.put(word, 1);
countValue.setValue(1);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
}
}
......@@ -15,9 +15,9 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountSink implements UserSinkInvokable {
......@@ -26,10 +26,10 @@ public class WordCountSink implements UserSinkInvokable {
private IntValue count = new IntValue(1);
@Override
public void invoke(Record record) throws Exception {
public void invoke(StreamRecord record) throws Exception {
record.getFieldInto(0, word);
record.getFieldInto(1, count);
word = (StringValue) record.getField(0);
count = (IntValue) record.getField(1);
System.out.println(word.getValue() + " " + count.getValue());
......
......@@ -15,19 +15,19 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountSource extends UserSourceInvokable {
// private final String motto = "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Marci Gabor Frank Fabian Stephan";
private final Record mottoRecord = new Record(new StringValue(motto));
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private final StreamRecord mottoRecord = new StreamRecord(new StringValue(motto));
@Override
public void invoke() throws Exception {
for (int i = 0; i < 2; i++) {
for (int i = 0; i < 10000; i++) {
emit(mottoRecord);
}
}
......
......@@ -15,8 +15,8 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class WordCountSplitter extends UserTaskInvokable {
......@@ -24,11 +24,13 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private Record outputRecord = new Record(wordValue);
private StreamRecord outputRecord = new StreamRecord(wordValue);
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, sentence);
public void invoke(StreamRecord record) throws Exception {
//record.getFieldInto(0, sentence);
sentence = (StringValue) record.getField(0);
System.out.println("to split: " + sentence.getValue());
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
......
......@@ -136,7 +136,12 @@ public abstract class TestBase2 {
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = this.executor.getJobClient(jobGraph);
JobClient client = null;
try {
client = this.executor.getJobClient(jobGraph); }
catch(Exception e) {
System.err.println("here");
}
client.setConsoleStreamForReporting(getNullPrintStream());
this.jobExecutionResult = client.submitJobAndWait();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册