From 7bedeba74f2b0bf650a411c743dcd5866c85cb56 Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:05 +0200 Subject: [PATCH] [streaming] batchwordcount updated for tuple --- .../api/streamrecord/StreamRecord.java | 67 ++++++++++--------- .../wordcount/BatchWordCountCounter.java | 24 +++---- .../batch/wordcount/BatchWordCountLocal.java | 26 +++---- .../batch/wordcount/BatchWordCountSink.java | 18 ++--- .../batch/wordcount/BatchWordCountSource.java | 30 ++++----- .../wordcount/BatchWordCountSplitter.java | 24 ++++--- .../examples/wordcount/WordCountLocal.java | 6 +- .../examples/wordcount/WordCountSource.java | 6 +- .../partitioner/FieldsPartitioner.java | 28 ++++---- 9 files changed, 111 insertions(+), 118 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 9de10c4b528..b69971af716 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -58,6 +58,13 @@ public class StreamRecord implements IOReadableWritable, Serializable { public StreamRecord() { } + public StreamRecord(int numOfFields) { + this.numOfFields = numOfFields; + this.numOfRecords = 0; + recordBatch = new ArrayList(); + + } + /** * Creates a new empty batch of records and sets the field number to the * given number, and the number of records to the given number. Setting @@ -155,7 +162,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public String getString(int fieldNumber) { try { return (String) recordBatch.get(0).getField(fieldNumber); @@ -163,7 +170,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Integer getInteger(int fieldNumber) { try { return (Integer) recordBatch.get(0).getField(fieldNumber); @@ -171,7 +178,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Long getLong(int fieldNumber) { try { return (Long) recordBatch.get(0).getField(fieldNumber); @@ -179,7 +186,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Boolean getBoolean(int fieldNumber) { try { return (Boolean) recordBatch.get(0).getField(fieldNumber); @@ -187,7 +194,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Double getDouble(int fieldNumber) { try { return (Double) recordBatch.get(0).getField(fieldNumber); @@ -195,7 +202,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public String getString(int recordNumber, int fieldNumber) { try { return (String) recordBatch.get(recordNumber).getField(fieldNumber); @@ -203,7 +210,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Integer getInteger(int recordNumber, int fieldNumber) { try { return (Integer) recordBatch.get(recordNumber).getField(fieldNumber); @@ -211,7 +218,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Long getLong(int recordNumber, int fieldNumber) { try { return (Long) recordBatch.get(recordNumber).getField(fieldNumber); @@ -219,7 +226,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Boolean getBoolean(int recordNumber, int fieldNumber) { try { return (Boolean) recordBatch.get(recordNumber).getField(fieldNumber); @@ -227,7 +234,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - + public Double getDouble(int recordNumber, int fieldNumber) { try { return (Double) recordBatch.get(recordNumber).getField(fieldNumber); @@ -235,7 +242,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchFieldException()); } } - /** * Sets a field in the given position of a specific record in the batch @@ -254,7 +260,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setString(int recordNumber, int fieldNumber, String o) { try { recordBatch.get(recordNumber).setField(o, fieldNumber); @@ -262,7 +268,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setInteger(int recordNumber, int fieldNumber, Integer o) { try { recordBatch.get(recordNumber).setField(o, fieldNumber); @@ -270,7 +276,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setLong(int recordNumber, int fieldNumber, Long o) { try { recordBatch.get(recordNumber).setField(o, fieldNumber); @@ -278,7 +284,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setDouble(int recordNumber, int fieldNumber, Double o) { try { recordBatch.get(recordNumber).setField(o, fieldNumber); @@ -286,7 +292,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setBoolean(int recordNumber, int fieldNumber, Boolean o) { try { recordBatch.get(recordNumber).setField(o, fieldNumber); @@ -294,7 +300,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setString(int fieldNumber, String o) { try { recordBatch.get(0).setField(o, fieldNumber); @@ -302,7 +308,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setInteger(int fieldNumber, Integer o) { try { recordBatch.get(0).setField(o, fieldNumber); @@ -310,7 +316,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setLong(int fieldNumber, Long o) { try { recordBatch.get(0).setField(o, fieldNumber); @@ -318,7 +324,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setDouble(int fieldNumber, Double o) { try { recordBatch.get(0).setField(o, fieldNumber); @@ -326,7 +332,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw (new NoSuchRecordException()); } } - + public void setBoolean(int fieldNumber, Boolean o) { try { recordBatch.get(0).setField(o, fieldNumber); @@ -335,7 +341,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - /** * Sets a field in the given position of the first record in the batch * @@ -371,13 +376,13 @@ public class StreamRecord implements IOReadableWritable, Serializable { public Tuple getRecord() { return getRecord(0); } - - public void getTupleInto(Tuple tuple){ - + + public void getTupleInto(Tuple tuple) { + if (tuple.getArity() == numOfFields) { try { Tuple source = recordBatch.get(0); - for(int i=0;i wordCounts = new HashMap(); - private StringValue wordValue = new StringValue(); - private IntValue countValue = new IntValue(); - private LongValue timestamp = new LongValue(); - private String word = new String(); - private int count = 1; + private String word = ""; + private Integer count = 0; + private Long timestamp = 0L; + private StreamRecord outRecord = new StreamRecord(new Tuple3()); @Override public void invoke(StreamRecord record) throws Exception { - wordValue = (StringValue) record.getField(0, 0); - timestamp = (LongValue) record.getField(0, 1); + word = record.getString(0); + timestamp = record.getLong(1); if (wordCounts.containsKey(word)) { count = wordCounts.get(word) + 1; wordCounts.put(word, count); - countValue.setValue(count); } else { + count = 1; wordCounts.put(word, 1); - countValue.setValue(1); } - emit(new StreamRecord(wordValue, countValue, timestamp)); + outRecord.setString(0, word); + outRecord.setInteger(1, count); + outRecord.setLong(2, timestamp); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java index 5e666075f97..4693cc0d076 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountLocal.java @@ -31,20 +31,15 @@ public class BatchWordCountLocal { public static JobGraph getJobGraph() { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("BatchWordCountSource", - BatchWordCountSource.class); - graphBuilder.setTask("BatchWordCountSplitter", - BatchWordCountSplitter.class, 2); - graphBuilder.setTask("BatchWordCountCounter", - BatchWordCountCounter.class, 2); + graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class); + graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2); + graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2); graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class); - graphBuilder.shuffleConnect("BatchWordCountSource", - "BatchWordCountSplitter"); - graphBuilder.fieldsConnect("BatchWordCountSplitter", - "BatchWordCountCounter", 0, StringValue.class); - graphBuilder.shuffleConnect("BatchWordCountCounter", - "BatchWordCountSink"); + graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter"); + graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0, + StringValue.class); + graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink"); return graphBuilder.getJobGraph(); } @@ -68,8 +63,7 @@ public class BatchWordCountLocal { exec.start(); - Client client = new Client(new InetSocketAddress("localhost", - 6498), configuration); + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); client.run(jG, true); @@ -78,8 +72,8 @@ public class BatchWordCountLocal { } else if (args[0].equals("cluster")) { System.out.println("Running in Cluster2 mode"); - Client client = new Client(new InetSocketAddress( - "hadoop02.ilab.sztaki.hu", 6123), configuration); + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); client.run(jG, true); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java index 4cc1fb80c03..597b0aaeeb4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java @@ -17,24 +17,20 @@ package eu.stratosphere.streaming.examples.batch.wordcount; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -import eu.stratosphere.types.IntValue; -import eu.stratosphere.types.LongValue; -import eu.stratosphere.types.StringValue; public class BatchWordCountSink extends UserSinkInvokable { - private StringValue word = new StringValue(); - private IntValue count = new IntValue(); - private LongValue timestamp = new LongValue(); + private String word = ""; + private Integer count = 0; + private Long timestamp = 0L; @Override public void invoke(StreamRecord record) throws Exception { - word = (StringValue) record.getField(0, 0); - count = (IntValue) record.getField(0, 1); - timestamp = (LongValue) record.getField(0, 2); + word = record.getString(0); + count = record.getInteger(1); + timestamp = record.getLong(2); System.out.println("============================================"); - System.out.println(word.getValue() + " " + count.getValue() + " " - + timestamp.getValue()); + System.out.println(word + " " + count + " " + timestamp); System.out.println("============================================"); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java index 009acaf973f..55b40168266 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java @@ -19,27 +19,23 @@ import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -import eu.stratosphere.types.LongValue; -import eu.stratosphere.types.StringValue; -import eu.stratosphere.types.Value; public class BatchWordCountSource extends UserSourceInvokable { private BufferedReader br = null; private String line = new String(); - private StringValue lineValue = new StringValue(); - private LongValue timestampValue = new LongValue(); - private Value[] values = new Value[2]; + private StreamRecord outRecord = new StreamRecord(new Tuple2()); + private final static int BATCH_SIZE = 10; - private long timestamp = 0; + private Long timestamp = 0L; public BatchWordCountSource() { try { - br = new BufferedReader(new FileReader( - "src/test/resources/testdata/hamlet.txt")); + br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt")); } catch (FileNotFoundException e) { e.printStackTrace(); } @@ -47,21 +43,19 @@ public class BatchWordCountSource extends UserSourceInvokable { @Override public void invoke() throws Exception { - timestamp = 0; - StreamRecord mottoRecords = new StreamRecord(2); + timestamp = 0L; + outRecord = new StreamRecord(2); + line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); while (line != null) { if (line != "") { - lineValue.setValue(line); - timestampValue.setValue(timestamp); - values[0] = lineValue; - values[1] = timestampValue; - mottoRecords.addRecord(values); + + outRecord.addRecord(new Tuple2(line, timestamp)); timestamp++; if (timestamp % BATCH_SIZE == 0) { - emit(mottoRecords); - mottoRecords = new StreamRecord(2); + emit(outRecord); + outRecord = new StreamRecord(new Tuple2(), BATCH_SIZE); } } line = br.readLine(); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java index 2b2cd974f7b..75c6647b51f 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java @@ -15,28 +15,30 @@ package eu.stratosphere.streaming.examples.batch.wordcount; +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -import eu.stratosphere.types.LongValue; -import eu.stratosphere.types.StringValue; public class BatchWordCountSplitter extends UserTaskInvokable { - private StringValue sentence = new StringValue(); - private LongValue timestamp = new LongValue(); + + private String[] words = new String[] {}; - private StringValue wordValue = new StringValue(); + private StreamRecord outputRecord = new StreamRecord(new Tuple2()); + + private Long timestamp =0L; + @Override public void invoke(StreamRecord record) throws Exception { int numberOfRecords = record.getNumOfRecords(); for (int i = 0; i < numberOfRecords; ++i) { - sentence = (StringValue) record.getField(i, 0); - timestamp = (LongValue) record.getField(i, 1); - words = sentence.getValue().split(" "); - for (CharSequence word : words) { - wordValue.setValue(word); - emit(new StreamRecord(wordValue, timestamp)); + words = record.getString(0).split(" "); + timestamp=record.getLong(1); + for (String word : words) { + outputRecord.setString(0, word); + outputRecord.setLong(1, timestamp); + emit(outputRecord); } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java index 0292769bcb1..d5e9ddea744 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java @@ -25,6 +25,7 @@ import eu.stratosphere.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.util.LogUtils; +import eu.stratosphere.types.StringValue; public class WordCountLocal { @@ -36,10 +37,9 @@ public class WordCountLocal { graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter"); - graphBuilder.shuffleConnect("WordCountSplitter","WordCountCounter"); -// graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class); + graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class); graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); - + return graphBuilder.getJobGraph(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java index 8b1f58100d0..84552c492df 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java @@ -27,7 +27,7 @@ public class WordCountSource extends UserSourceInvokable { private BufferedReader br = null; private String line = new String(); - private Tuple1 lineTuple = new Tuple1(); + private StreamRecord outRecord = new StreamRecord(new Tuple1()); public WordCountSource() { try { @@ -42,9 +42,9 @@ public class WordCountSource extends UserSourceInvokable { line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); while (line != null) { if (line != "") { - lineTuple.setField(line, 0); + outRecord.setString(0,line); // TODO: object reuse - emit(new StreamRecord(lineTuple)); + emit(outRecord); } line = br.readLine(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/FieldsPartitioner.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/FieldsPartitioner.java index f2f9f412dfb..9fa7bd34854 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/FieldsPartitioner.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/partitioner/FieldsPartitioner.java @@ -23,25 +23,29 @@ import eu.stratosphere.types.Key; public class FieldsPartitioner implements ChannelSelector { private int keyPosition; - private Class keyClass; +// private Class keyClass; public FieldsPartitioner(int keyPosition, Class keyClass) { this.keyPosition = keyPosition; - this.keyClass = keyClass; +// this.keyClass = keyClass; } @Override public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) { - Key key = null; - try { - key = keyClass.newInstance(); - } catch (InstantiationException e) { - e.printStackTrace(); - } catch (IllegalAccessException e) { - e.printStackTrace(); - } + //TODO:fix this +// Key key = null; +// try { +// key = keyClass.newInstance(); +// } catch (InstantiationException e) { +// e.printStackTrace(); +// } catch (IllegalAccessException e) { +// e.printStackTrace(); +// } // TODO: consider hash partition the whole record batch. - key = keyClass.cast(record.getField(0, keyPosition)); - return new int[] { Math.abs(key.hashCode()) % numberOfOutputChannels }; + +// +// } +// key = keyClass.cast(record.getField(0, keyPosition)); + return new int[] { Math.abs(record.getField(0, keyPosition).hashCode()) % numberOfOutputChannels }; } } -- GitLab