提交 7bedeba7 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] batchwordcount updated for tuple

上级 7941e5cd
......@@ -58,6 +58,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord() {
}
public StreamRecord(int numOfFields) {
this.numOfFields = numOfFields;
this.numOfRecords = 0;
recordBatch = new ArrayList<Tuple>();
}
/**
* Creates a new empty batch of records and sets the field number to the
* given number, and the number of records to the given number. Setting
......@@ -155,7 +162,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public String getString(int fieldNumber) {
try {
return (String) recordBatch.get(0).getField(fieldNumber);
......@@ -163,7 +170,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Integer getInteger(int fieldNumber) {
try {
return (Integer) recordBatch.get(0).getField(fieldNumber);
......@@ -171,7 +178,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Long getLong(int fieldNumber) {
try {
return (Long) recordBatch.get(0).getField(fieldNumber);
......@@ -179,7 +186,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Boolean getBoolean(int fieldNumber) {
try {
return (Boolean) recordBatch.get(0).getField(fieldNumber);
......@@ -187,7 +194,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Double getDouble(int fieldNumber) {
try {
return (Double) recordBatch.get(0).getField(fieldNumber);
......@@ -195,7 +202,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public String getString(int recordNumber, int fieldNumber) {
try {
return (String) recordBatch.get(recordNumber).getField(fieldNumber);
......@@ -203,7 +210,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Integer getInteger(int recordNumber, int fieldNumber) {
try {
return (Integer) recordBatch.get(recordNumber).getField(fieldNumber);
......@@ -211,7 +218,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Long getLong(int recordNumber, int fieldNumber) {
try {
return (Long) recordBatch.get(recordNumber).getField(fieldNumber);
......@@ -219,7 +226,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Boolean getBoolean(int recordNumber, int fieldNumber) {
try {
return (Boolean) recordBatch.get(recordNumber).getField(fieldNumber);
......@@ -227,7 +234,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
public Double getDouble(int recordNumber, int fieldNumber) {
try {
return (Double) recordBatch.get(recordNumber).getField(fieldNumber);
......@@ -235,7 +242,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchFieldException());
}
}
/**
* Sets a field in the given position of a specific record in the batch
......@@ -254,7 +260,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setString(int recordNumber, int fieldNumber, String o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
......@@ -262,7 +268,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setInteger(int recordNumber, int fieldNumber, Integer o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
......@@ -270,7 +276,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setLong(int recordNumber, int fieldNumber, Long o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
......@@ -278,7 +284,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setDouble(int recordNumber, int fieldNumber, Double o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
......@@ -286,7 +292,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setBoolean(int recordNumber, int fieldNumber, Boolean o) {
try {
recordBatch.get(recordNumber).setField(o, fieldNumber);
......@@ -294,7 +300,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setString(int fieldNumber, String o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
......@@ -302,7 +308,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setInteger(int fieldNumber, Integer o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
......@@ -310,7 +316,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setLong(int fieldNumber, Long o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
......@@ -318,7 +324,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setDouble(int fieldNumber, Double o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
......@@ -326,7 +332,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw (new NoSuchRecordException());
}
}
public void setBoolean(int fieldNumber, Boolean o) {
try {
recordBatch.get(0).setField(o, fieldNumber);
......@@ -335,7 +341,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Sets a field in the given position of the first record in the batch
*
......@@ -371,13 +376,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public Tuple getRecord() {
return getRecord(0);
}
public void getTupleInto(Tuple tuple){
public void getTupleInto(Tuple tuple) {
if (tuple.getArity() == numOfFields) {
try {
Tuple source = recordBatch.get(0);
for(int i=0;i<numOfFields;i++){
for (int i = 0; i < numOfFields; i++) {
tuple.setField(source.getField(i), i);
}
} catch (IndexOutOfBoundsException e) {
......@@ -386,7 +391,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
} else {
throw (new RecordSizeMismatchException());
}
}
/**
......@@ -454,11 +459,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(buff);
StreamRecord newRecord = new StreamRecord();
StreamRecord newRecord = new StreamRecord();
try {
this.write(out);
DataInputStream in = new DataInputStream(new ByteArrayInputStream(buff.toByteArray()));
newRecord.read(in);
} catch (Exception e) {
}
......@@ -467,11 +472,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
private void writeTuple(Tuple tuple, DataOutput out) {
Class[] basicTypes = new Class[tuple.getArity()];
StringBuilder basicTypeNames = new StringBuilder();
//TODO: exception for empty record - no getField!
// TODO: exception for empty record - no getField!
for (int i = 0; i < basicTypes.length; i++) {
basicTypes[i] = tuple.getField(i).getClass();
basicTypeNames.append(basicTypes[i].getName() + ",");
......
......@@ -18,35 +18,33 @@ package eu.stratosphere.streaming.examples.batch.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue();
private IntValue countValue = new IntValue();
private LongValue timestamp = new LongValue();
private String word = new String();
private int count = 1;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(new Tuple3<String, Integer, Long>());
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getField(0, 0);
timestamp = (LongValue) record.getField(0, 1);
word = record.getString(0);
timestamp = record.getLong(1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
} else {
count = 1;
wordCounts.put(word, 1);
countValue.setValue(1);
}
emit(new StreamRecord(wordValue, countValue, timestamp));
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
}
}
\ No newline at end of file
......@@ -31,20 +31,15 @@ public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource",
BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter",
BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter",
BatchWordCountCounter.class, 2);
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2);
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.shuffleConnect("BatchWordCountSource",
"BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter",
"BatchWordCountCounter", 0, StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter",
"BatchWordCountSink");
graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0,
StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink");
return graphBuilder.getJobGraph();
}
......@@ -68,8 +63,7 @@ public class BatchWordCountLocal {
exec.start();
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
......@@ -78,8 +72,8 @@ public class BatchWordCountLocal {
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
......
......@@ -17,24 +17,20 @@ package eu.stratosphere.streaming.examples.batch.wordcount;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountSink extends UserSinkInvokable {
private StringValue word = new StringValue();
private IntValue count = new IntValue();
private LongValue timestamp = new LongValue();
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
word = (StringValue) record.getField(0, 0);
count = (IntValue) record.getField(0, 1);
timestamp = (LongValue) record.getField(0, 2);
word = record.getString(0);
count = record.getInteger(1);
timestamp = record.getLong(2);
System.out.println("============================================");
System.out.println(word.getValue() + " " + count.getValue() + " "
+ timestamp.getValue());
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
......
......@@ -19,27 +19,23 @@ import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class BatchWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = new String();
private StringValue lineValue = new StringValue();
private LongValue timestampValue = new LongValue();
private Value[] values = new Value[2];
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private final static int BATCH_SIZE = 10;
private long timestamp = 0;
private Long timestamp = 0L;
public BatchWordCountSource() {
try {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
......@@ -47,21 +43,19 @@ public class BatchWordCountSource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
timestamp = 0;
StreamRecord mottoRecords = new StreamRecord(2);
timestamp = 0L;
outRecord = new StreamRecord(2);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineValue.setValue(line);
timestampValue.setValue(timestamp);
values[0] = lineValue;
values[1] = timestampValue;
mottoRecords.addRecord(values);
outRecord.addRecord(new Tuple2<String, Long>(line, timestamp));
timestamp++;
if (timestamp % BATCH_SIZE == 0) {
emit(mottoRecords);
mottoRecords = new StreamRecord(2);
emit(outRecord);
outRecord = new StreamRecord(new Tuple2<String, Long>(), BATCH_SIZE);
}
}
line = br.readLine();
......
......@@ -15,28 +15,30 @@
package eu.stratosphere.streaming.examples.batch.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue();
private LongValue timestamp = new LongValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue();
private StreamRecord outputRecord = new StreamRecord(new Tuple2<String,Long>());
private Long timestamp =0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfRecords();
for (int i = 0; i < numberOfRecords; ++i) {
sentence = (StringValue) record.getField(i, 0);
timestamp = (LongValue) record.getField(i, 1);
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
emit(new StreamRecord(wordValue, timestamp));
words = record.getString(0).split(" ");
timestamp=record.getLong(1);
for (String word : words) {
outputRecord.setString(0, word);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
}
}
}
......
......@@ -25,6 +25,7 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.StringValue;
public class WordCountLocal {
......@@ -36,10 +37,9 @@ public class WordCountLocal {
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.shuffleConnect("WordCountSplitter","WordCountCounter");
// graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -27,7 +27,7 @@ public class WordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = new String();
private Tuple1<String> lineTuple = new Tuple1<String>();
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
public WordCountSource() {
try {
......@@ -42,9 +42,9 @@ public class WordCountSource extends UserSourceInvokable {
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineTuple.setField(line, 0);
outRecord.setString(0,line);
// TODO: object reuse
emit(new StreamRecord(lineTuple));
emit(outRecord);
}
line = br.readLine();
}
......
......@@ -23,25 +23,29 @@ import eu.stratosphere.types.Key;
public class FieldsPartitioner implements ChannelSelector<StreamRecord> {
private int keyPosition;
private Class<? extends Key> keyClass;
// private Class<? extends Key> keyClass;
public FieldsPartitioner(int keyPosition, Class<? extends Key> keyClass) {
this.keyPosition = keyPosition;
this.keyClass = keyClass;
// this.keyClass = keyClass;
}
@Override
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
Key key = null;
try {
key = keyClass.newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
//TODO:fix this
// Key key = null;
// try {
// key = keyClass.newInstance();
// } catch (InstantiationException e) {
// e.printStackTrace();
// } catch (IllegalAccessException e) {
// e.printStackTrace();
// }
// TODO: consider hash partition the whole record batch.
key = keyClass.cast(record.getField(0, keyPosition));
return new int[] { Math.abs(key.hashCode()) % numberOfOutputChannels };
//
// }
// key = keyClass.cast(record.getField(0, keyPosition));
return new int[] { Math.abs(record.getField(0, keyPosition).hashCode()) % numberOfOutputChannels };
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册