提交 73c4e336 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Added StreamRecord setter

上级 8fa43e82
.cache
.classpath
.idea
.metadata
.settings
.project
.version.properties
filter.properties
logs.zip
target
tmp
bin
*.class
*.iml
*.swp
*.jar
.DS_Store
......@@ -37,11 +37,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Creates a new empty batch of records and sets the field number to the given
* number
* Creates a new empty batch of records and sets the field number to the
* given number
*
* @param length
* Number of fields in the records
* Number of fields in the records
*/
public StreamRecord(int length) {
this.numOfFields = length;
......@@ -62,11 +62,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Given an array of Values, creates a new a record batch containing the array
* as its first element
* Given an array of Values, creates a new a record batch containing the
* array as its first element
*
* @param values
* Array containing the Values for the first record in the batch
* Array containing the Values for the first record in the batch
*/
public StreamRecord(Value... values) {
numOfFields = values.length;
......@@ -95,7 +95,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Set the ID of the StreamRecord object
*
* @param channelID
* ID of the emitting task
* ID of the emitting task
* @return The StreamRecord object
*/
public StreamRecord setId(String channelID) {
......@@ -113,35 +113,50 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Returns the Value of a field in the given position of a specific record in
* the batch
* Returns the Value of a field in the given position of a specific record
* in the batch
*
* @param recordNumber
* Position of the record in the batch
* Position of the record in the batch
* @param fieldNumber
* Position of the field in the record
* Position of the field in the record
* @return Value of the field
*/
public Value getField(int recordNumber, int fieldNumber) {
return recordBatch.get(recordNumber)[fieldNumber];
}
// TODO: javadoc
public void setRecord(int recordNumber, Value... values) {
if (values.length == numOfRecords) {
recordBatch.set(recordNumber, values);
}
}
public void setRecord(Value... values) {
// TODO: consider clearing the List
recordBatch = new ArrayList<Value[]>(1);
if (values.length == numOfRecords) {
recordBatch.set(0, values);
}
}
/**
*
* @param recordNumber
* Position of the record in the batch
* @return AtomRecord object containing the fields of the record
* Position of the record in the batch
* @return Value array containing the fields of the record
*/
public AtomRecord getRecord(int recordNumber) {
return new AtomRecord(recordBatch.get(recordNumber));
public Value[] getRecord(int recordNumber) {
return recordBatch.get(recordNumber);
}
/**
* Checks if the number of fields are equal to the batch field size then adds
* the AtomRecord to the end of the batch
* Checks if the number of fields are equal to the batch field size then
* adds the AtomRecord to the end of the batch
*
* @param record
* Record to be added
* Record to be added
*/
public void addRecord(AtomRecord record) {
Value[] fields = record.getFields();
......@@ -152,11 +167,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
* Checks if the number of fields are equal to the batch field size then adds
* the Value Arrray to the end of the batch
* Checks if the number of fields are equal to the batch field size then
* adds the Value array to the end of the batch
*
* @param record
* Value array to be added as the next record of the batch
* Value array to be added as the next record of the batch
*/
public void addRecord(Value[] fields) {
if (fields.length == numOfFields) {
......@@ -209,8 +224,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
StringValue stringValue = new StringValue("");
stringValue.read(in);
try {
record[i] = (Value) Class.forName(stringValue.getValue())
.newInstance();
record[i] = (Value) Class.forName(stringValue.getValue()).newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
......
......@@ -24,10 +24,9 @@ public abstract class StreamInvokable {
record.setId(channelID);
emittedRecords.addRecord(record);
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(record);
System.out.println(this.getClass().getName());
......
......@@ -25,20 +25,15 @@ public class BatchWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource",
BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter",
BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter",
BatchWordCountCounter.class, 2);
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2);
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.shuffleConnect("BatchWordCountSource",
"BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter",
"BatchWordCountCounter", 0, StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter",
"BatchWordCountSink");
graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -18,6 +18,7 @@ package eu.stratosphere.streaming.test.batch.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
......@@ -27,10 +28,10 @@ import eu.stratosphere.types.StringValue;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private LongValue timestamp = new LongValue();
private String word = new String();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private int count = 1;
@Override
......
......@@ -23,15 +23,15 @@ import eu.stratosphere.types.StringValue;
public class BatchWordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue();
private IntValue count = new IntValue();
private LongValue timestamp = new LongValue();
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private LongValue timestamp = new LongValue(0);
@Override
public void invoke(StreamRecord record) throws Exception {
word = (StringValue) record.getField(0, 0);
count = (IntValue) record.getField(0, 1);
timestamp = (LongValue) record.getField(0, 2);
word=(StringValue) record.getField(0, 0);
count=(IntValue) record.getField(0, 1);
timestamp=(LongValue) record.getField(0, 2);
System.out.println("============================================");
System.out.println(word.getValue() + " " + count.getValue() + " "
+ timestamp.getValue());
......
......@@ -15,57 +15,29 @@
package eu.stratosphere.streaming.test.batch.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class BatchWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = new String();
private StringValue lineValue = new StringValue();
private LongValue timestampValue = new LongValue();
private Value[] values = new Value[2];
private final static int BATCH_SIZE = 10;
private long timestamp = 0;
public BatchWordCountSource() {
try {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private long timestamp;
private StreamRecord mottoRecords = new StreamRecord(2);
@Override
public void invoke() throws Exception {
timestamp = 0;
StreamRecord mottoRecords = new StreamRecord(2);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineValue.setValue(line);
timestampValue.setValue(timestamp);
values[0] = lineValue;
values[1] = timestampValue;
mottoRecords.addRecord(values);
timestamp++;
if (timestamp % BATCH_SIZE == 0) {
emit(mottoRecords);
mottoRecords = new StreamRecord(2);
}
for (int i = 0; i < 100; ++i) {
AtomRecord mottoRecord = new AtomRecord(2);
mottoRecord.setField(0, new StringValue(motto));
mottoRecord.setField(1, new LongValue(timestamp));
mottoRecords.addRecord(mottoRecord);
++timestamp;
if (timestamp % 10 == 0) {
emit(mottoRecords);
}
line = br.readLine();
}
}
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.test.batch.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.LongValue;
......@@ -22,21 +23,24 @@ import eu.stratosphere.types.StringValue;
public class BatchWordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue();
private LongValue timestamp = new LongValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue();
private StringValue sentence = new StringValue("");
private LongValue timestamp = new LongValue(0);
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private AtomRecord outputRecord = new AtomRecord(2);
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfRecords();
for (int i = 0; i < numberOfRecords; ++i) {
for(int i=0; i<numberOfRecords; ++i){
sentence = (StringValue) record.getField(i, 0);
timestamp = (LongValue) record.getField(i, 1);
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
emit(new StreamRecord(wordValue, timestamp));
outputRecord.setField(0, wordValue);
outputRecord.setField(1, timestamp);
emit(new StreamRecord(outputRecord));
}
}
}
......
......@@ -32,7 +32,7 @@ public class CellInfo extends TestBase2 {
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, IntValue.class);
graphBuilder.fieldsConnect("querySource", "cellTask",0, IntValue.class);
graphBuilder.shuffleConnect("cellTask", "sink");
graphBuilder.broadcastConnect("cellTask", "sink");
return graphBuilder.getJobGraph();
}
......
......@@ -17,6 +17,5 @@ package eu.stratosphere.streaming.test.cellinfo;
public interface IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId);
public void put(int cellId, long timeStamp);
}
......@@ -30,10 +30,10 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private LongValue timestamp = new LongValue();
private String word = new String();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private int count = 1;
@Override
......
......@@ -33,7 +33,7 @@ public class WordCountCounter extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getRecord(0).getField(0);
wordValue = (StringValue) record.getRecord(0)[0];
word = wordValue.getValue();
if (wordCounts.containsKey(word)) {
......
......@@ -9,17 +9,26 @@ public class WordCountDummySource extends UserSourceInvokable {
private String line = new String();
private StringValue lineValue = new StringValue();
private Value[] values = new Value[1];
public WordCountDummySource() {
line = "first second";
lineValue.setValue(line);
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1; i++) {
emit(new StreamRecord(lineValue));
System.out.println("xxxxxxxxx");
}
line = "first one";
lineValue.setValue(line);
values[0] = lineValue;
StreamRecord record = new StreamRecord(lineValue);
emit(record);
line = "second two";
lineValue.setValue(line);
values[0] = lineValue;
record.setRecord(0, values);
emit(record);
}
}
......@@ -27,7 +27,7 @@ public class WordCountSplitter extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
sentence = (StringValue) record.getRecord(0).getField(0);
sentence = (StringValue) record.getRecord(0)[0];
System.out.println("to split: " + sentence.getValue());
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册