From ca1e41dab966179bffdeaa6971d217ec161b0b96 Mon Sep 17 00:00:00 2001 From: Yingjun Wu Date: Mon, 14 Jul 2014 16:29:10 +0200 Subject: [PATCH] [streaming] fix several bugs in in the examples. Window operator runnable --- .../api/streamrecord/StreamRecord.java | 30 ----- .../wordcount/BatchWordCountCounter.java | 43 ++++--- .../batch/wordcount/BatchWordCountSink.java | 16 +-- .../batch/wordcount/BatchWordCountSource.java | 32 ++--- .../wordcount/BatchWordCountSplitter.java | 27 ++-- .../wordcount/WindowWordCountCounter.java | 64 ++++++---- .../window/wordcount/WindowWordCountSink.java | 16 +-- .../wordcount/WindowWordCountSource.java | 19 +-- .../wordcount/WindowWordCountSplitter.java | 16 ++- .../examples/wordcount/WordCountCounter.java | 1 - .../examples/wordcount/WordCountLocal.java | 90 +++++--------- .../examples/wordcount/WordCountSource.java | 34 ++---- .../wordcount/WordCountSourceSplitter.java | 33 ++--- .../examples/wordcount/WordCountStarter.java | 115 ++++++++++++++++++ .../streaming/state/MutableTableState.java | 2 +- .../streaming/state/WindowState.java | 4 +- 16 files changed, 298 insertions(+), 244 deletions(-) mode change 100755 => 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java create mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index 89224e4001a..5ece5da4b62 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -137,7 +137,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { this.batchSize = batchSize; tupleBatch = new ArrayList(batchSize); tupleBatch.add(tuple); - } /** @@ -939,35 +938,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { throw new TupleSizeMismatchException(); } } - - /** - * Checks if the number of fields are equal to the batch field size then - * adds the shadow copy of Tuple to the end of the batch - * - * @param tuple - * Tuple to be added as the next record of the batch - */ - public void addShadowTuple(Tuple tuple) throws TupleSizeMismatchException { - addShadowTuple(numOfTuples, tuple); - } - - /** - * Checks if the number of fields are equal to the batch field size then - * inserts the shadow copy of Tuple to the given position into the recordbatch - * - * @param index - * Position of the added tuple - * @param tuple - * Tuple to be added as the next record of the batch - */ - public void addShadowTuple(int index, Tuple tuple) throws TupleSizeMismatchException { - if (tuple.getArity() == numOfFields) { - tupleBatch.add(index, tuple); - numOfTuples++; - } else { - throw new TupleSizeMismatchException(); - } - } /** * Creates a copy of the StreamRecord object by Serializing and diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java index 7ca3fe190f3..498ab4c315e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java @@ -15,38 +15,45 @@ package eu.stratosphere.streaming.examples.batch.wordcount; -import java.util.HashMap; -import java.util.Map; - +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.state.MutableTableState; +import eu.stratosphere.streaming.state.MutableTableStateIterator; public class BatchWordCountCounter extends UserTaskInvokable { - private Map wordCounts = new HashMap(); + private MutableTableState wordCounts = new MutableTableState(); private String word = ""; private Integer count = 0; private Long timestamp = 0L; - private StreamRecord outRecord = new StreamRecord(new Tuple3()); + private StreamRecord outRecord = new StreamRecord(3); @Override public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - timestamp = record.getLong(1); - - if (wordCounts.containsKey(word)) { - count = wordCounts.get(word) + 1; - wordCounts.put(word, count); - } else { - count = 1; - wordCounts.put(word, 1); + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + if (wordCounts.containsKey(word)) { + count = wordCounts.get(word) + 1; + wordCounts.put(word, count); + } else { + count = 1; + wordCounts.put(word, 1); + } } - outRecord.setString(0, word); - outRecord.setInteger(1, count); - outRecord.setLong(2, timestamp); - + MutableTableStateIterator iterator = wordCounts + .getIterator(); + while (iterator.hasNext()) { + Tuple2 tuple = iterator.next(); + Tuple3 outputTuple = new Tuple3( + (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp); + outRecord.addTuple(outputTuple); + } emit(outRecord); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java index 597b0aaeeb4..cac944fb860 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java @@ -26,12 +26,14 @@ public class BatchWordCountSink extends UserSinkInvokable { @Override public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - count = record.getInteger(1); - timestamp = record.getLong(2); - System.out.println("============================================"); - System.out.println(word + " " + count + " " + timestamp); - System.out.println("============================================"); - + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + System.out.println("============================================"); + System.out.println(word + " " + count + " " + timestamp); + System.out.println("============================================"); + } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java index 549b781b784..d8b167425c0 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java @@ -28,9 +28,6 @@ public class BatchWordCountSource extends UserSourceInvokable { private BufferedReader br = null; private String line = ""; private StreamRecord outRecord = new StreamRecord(new Tuple2()); - - private final static int BATCH_SIZE = 20; - private Long timestamp = 0L; public BatchWordCountSource() { @@ -39,29 +36,24 @@ public class BatchWordCountSource extends UserSourceInvokable { } catch (FileNotFoundException e) { e.printStackTrace(); } + timestamp = 0L; } @Override public void invoke() throws Exception { - timestamp = 0L; - outRecord = new StreamRecord(2); - - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - - while (line != null) { + for(int i=0; i<100; ++i) { + line = br.readLine(); + if(line==null){ + break; + } if (line != "") { - - outRecord.addTuple(new Tuple2(line, timestamp)); + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + System.out.println("line="+line); + outRecord.setString(0, line); + outRecord.setLong(1, timestamp); timestamp++; - if (timestamp % BATCH_SIZE == 0) { - emit(outRecord); - outRecord = new StreamRecord(2); - } + emit(outRecord); } - - line = br.readLine(); - } } - -} \ No newline at end of file +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java index 7988607991a..6b4331d4b4d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java @@ -15,31 +15,26 @@ package eu.stratosphere.streaming.examples.batch.wordcount; -import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class BatchWordCountSplitter extends UserTaskInvokable { - - - private String[] words = new String[] {}; - private StreamRecord outputRecord = new StreamRecord(new Tuple2()); + private StreamRecord outputRecord = new StreamRecord(3); - private Long timestamp =0L; - + private Long timestamp = 0L; @Override public void invoke(StreamRecord record) throws Exception { - int numberOfRecords = record.getNumOfTuples(); - for (int i = 0; i < numberOfRecords; ++i) { - words = record.getString(0).split(" "); - timestamp=record.getLong(1); - for (String word : words) { - outputRecord.setString(0, word); - outputRecord.setLong(1, timestamp); - emit(outputRecord); - } + words = record.getString(0).split(" "); + timestamp = record.getLong(1); + System.out.println("sentence=" + record.getString(0) + ", timestamp=" + + record.getLong(1)); + for (String word : words) { + Tuple3 tuple =new Tuple3(word, 1, timestamp); + outputRecord.addTuple(tuple); } + emit(outputRecord); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java index 1970b22b776..6f8b95586a8 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java @@ -15,10 +15,12 @@ package eu.stratosphere.streaming.examples.window.wordcount; +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.state.MutableTableState; +import eu.stratosphere.streaming.state.MutableTableStateIterator; import eu.stratosphere.streaming.state.WindowState; public class WindowWordCountCounter extends UserTaskInvokable { @@ -34,36 +36,42 @@ public class WindowWordCountCounter extends UserTaskInvokable { private String word = ""; private Integer count = 0; private Long timestamp = 0L; - private StreamRecord outRecord = new StreamRecord( - new Tuple3()); + private StreamRecord outRecord = new StreamRecord(3); public WindowWordCountCounter() { windowSize = 100; slidingStep = 20; computeGranularity = 10; windowFieldId = 2; - window = new WindowState(windowSize, slidingStep, computeGranularity, windowFieldId); + window = new WindowState(windowSize, slidingStep, + computeGranularity, windowFieldId); wordCounts = new MutableTableState(); } private void incrementCompute(StreamRecord record) { - word = record.getString(0); - if (wordCounts.containsKey(word)) { - count = wordCounts.get(word) + 1; - wordCounts.put(word, count); - } else { - count = 1; - wordCounts.put(word, 1); + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + if (wordCounts.containsKey(word)) { + count = wordCounts.get(word) + 1; + wordCounts.put(word, count); + } else { + count = 1; + wordCounts.put(word, 1); + } } } private void decrementCompute(StreamRecord record) { - word = record.getString(0); - count = wordCounts.get(word) - 1; - if (count == 0) { - wordCounts.delete(word); - } else { - wordCounts.put(word, count); + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = wordCounts.get(word) - 1; + if (count == 0) { + wordCounts.delete(word); + } else { + wordCounts.put(word, count); + } } } @@ -75,18 +83,28 @@ public class WindowWordCountCounter extends UserTaskInvokable { decrementCompute(expiredRecord); window.pushBack(record); if (window.isComputable()) { - outRecord.setString(0, word); - outRecord.setInteger(1, count); - outRecord.setLong(2, timestamp); + MutableTableStateIterator iterator = wordCounts + .getIterator(); + while (iterator.hasNext()) { + Tuple2 tuple = iterator.next(); + Tuple3 outputTuple = new Tuple3( + (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp); + outRecord.addTuple(outputTuple); + } emit(outRecord); } } else { incrementCompute(record); window.pushBack(record); - if(window.isFull()){ - outRecord.setString(0, word); - outRecord.setInteger(1, count); - outRecord.setLong(2, timestamp); + if (window.isFull()) { + MutableTableStateIterator iterator = wordCounts + .getIterator(); + while (iterator.hasNext()) { + Tuple2 tuple = iterator.next(); + Tuple3 outputTuple = new Tuple3( + (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp); + outRecord.addTuple(outputTuple); + } emit(outRecord); } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java index 40ccf76b430..4e4977f54eb 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java @@ -26,12 +26,14 @@ public class WindowWordCountSink extends UserSinkInvokable { @Override public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - count = record.getInteger(1); - timestamp = record.getLong(2); - System.out.println("============================================"); - System.out.println(word + " " + count + " " + timestamp); - System.out.println("============================================"); - + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + System.out.println("============================================"); + System.out.println(word + " " + count + " " + timestamp); + System.out.println("============================================"); + } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java index 99c3be26742..952656944f2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java @@ -26,10 +26,9 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class WindowWordCountSource extends UserSourceInvokable { private BufferedReader br = null; - private String line = new String(); + private String line = ""; private StreamRecord outRecord = new StreamRecord(new Tuple2()); - - private Long timestamp; + private Long timestamp = 0L; public WindowWordCountSource() { try { @@ -37,20 +36,24 @@ public class WindowWordCountSource extends UserSourceInvokable { } catch (FileNotFoundException e) { e.printStackTrace(); } + timestamp = 0L; } @Override public void invoke() throws Exception { - timestamp = 0L; - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - while (line != null) { + for(int i=0; i<10; ++i) { + line = br.readLine(); + if(line==null){ + break; + } if (line != "") { + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + System.out.println("line="+line); outRecord.setString(0, line); outRecord.setLong(1, timestamp); + timestamp++; emit(outRecord); } - line = br.readLine(); - timestamp++; } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java index 32aaa137475..a2c80c54d2b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java @@ -15,14 +15,13 @@ package eu.stratosphere.streaming.examples.window.wordcount; -import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class WindowWordCountSplitter extends UserTaskInvokable { - private String[] words = new String[] {}; - private StreamRecord outputRecord = new StreamRecord(new Tuple2()); + private StreamRecord outputRecord = new StreamRecord(3); private Long timestamp = 0L; @@ -30,13 +29,12 @@ public class WindowWordCountSplitter extends UserTaskInvokable { public void invoke(StreamRecord record) throws Exception { words = record.getString(0).split(" "); timestamp = record.getLong(1); - System.out.println("************sentence=" + words + ", timestamp=" + timestamp - + "************"); + System.out.println("sentence=" + record.getString(0) + ", timestamp=" + + record.getLong(1)); for (String word : words) { - outputRecord.setString(0, word); - outputRecord.setLong(1, timestamp); - emit(outputRecord); + Tuple3 tuple =new Tuple3(word, 1, timestamp); + outputRecord.addTuple(tuple); } - + emit(outputRecord); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java index e80623419be..c763274d043 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java @@ -19,7 +19,6 @@ import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.state.MutableTableState; -import eu.stratosphere.streaming.util.PerformanceCounter; public class WordCountCounter extends UserTaskInvokable { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java index 7d023a6aed8..78a590c9e3c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java @@ -28,16 +28,11 @@ import eu.stratosphere.streaming.util.LogUtils; public class WordCountLocal { - private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance, - int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks, - int sinkSubtasksPerInstance) throws Exception { + public static JobGraph getJobGraph() { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class, - sourceSubtasks, sourceSubtasksPerInstance); - graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks, - counterSubtasksPerInstance); - graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks, - sinkSubtasksPerInstance); + graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class); + graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1); + graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0); graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); @@ -45,71 +40,44 @@ public class WordCountLocal { return graphBuilder.getJobGraph(); } - private static void wrongArgs() { - System.out - .println("USAGE:\n" - + "run "); - } - - // TODO: arguments check public static void main(String[] args) { - if (args.length != 7) { - wrongArgs(); - } else { - LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO); - - int sourceSubtasks = 1; - int sourceSubtasksPerInstance = 1; - int counterSubtasks = 1; - int counterSubtasksPerInstance = 1; - int sinkSubtasks = 1; - int sinkSubtasksPerInstance = 1; - - try { - sourceSubtasks = Integer.parseInt(args[1]); - sourceSubtasksPerInstance = Integer.parseInt(args[2]); - counterSubtasks = Integer.parseInt(args[3]); - counterSubtasksPerInstance = Integer.parseInt(args[4]); - sinkSubtasks = Integer.parseInt(args[5]); - sinkSubtasksPerInstance = Integer.parseInt(args[6]); - } catch (Exception e) { - wrongArgs(); + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + + try { + + JobGraph jG = getJobGraph(); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; } - try { - JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance, - counterSubtasks, counterSubtasksPerInstance, sinkSubtasks, - sinkSubtasksPerInstance); - Configuration configuration = jG.getJobConfiguration(); + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); - if (args.length == 0) { - args = new String[] { "local" }; - } + exec.start(); - if (args[0].equals("local")) { - System.out.println("Running in Local mode"); - NepheleMiniCluster exec = new NepheleMiniCluster(); + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); - exec.start(); + client.run(jG, true); - Client client = new Client(new InetSocketAddress("localhost", 6498), - configuration); + exec.stop(); - client.run(jG, true); + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster2 mode"); - exec.stop(); - } else if (args[0].equals("cluster")) { - System.out.println("Running in Cluster mode"); + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); - Client client = new Client(new InetSocketAddress("dell150", 6123), - configuration); - client.run(jG, true); - } + client.run(jG, true); - } catch (Exception e) { - System.out.println(e); } + + } catch (Exception e) { + System.out.println(e); } + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java index 263d80e28f4..c61db134035 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java @@ -16,7 +16,6 @@ package eu.stratosphere.streaming.examples.wordcount; import java.io.BufferedReader; -import java.io.FileNotFoundException; import java.io.FileReader; import eu.stratosphere.api.java.tuple.Tuple1; @@ -31,27 +30,20 @@ public class WordCountSource extends UserSourceInvokable { @Override public void invoke() throws Exception { - - for (int i = 0; i < 2; i++) { - try { - br = new BufferedReader(new FileReader( - "/home/strato/stratosphere-distrib/resources/hamlet.txt")); - - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - while (line != null) { - if (line != "") { - outRecord.setString(0, line); - emit(outRecord); - performanceCounter.count(); - } - line = br.readLine(); - } - - } catch (FileNotFoundException e) { - e.printStackTrace(); + br = new BufferedReader(new FileReader( + "src/test/resources/testdata/hamlet.txt")); + while (true) { + line = br.readLine(); + if (line == null) { + break; } + if (line != "") { + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + outRecord.setString(0, line); + emit(outRecord); + performanceCounter.count(); + } + line = br.readLine(); } - } - } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java old mode 100755 new mode 100644 index 534db04ee77..9c5aa3e9037 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java @@ -16,7 +16,6 @@ package eu.stratosphere.streaming.examples.wordcount; import java.io.BufferedReader; -import java.io.FileNotFoundException; import java.io.FileReader; import eu.stratosphere.api.java.tuple.Tuple1; @@ -31,28 +30,22 @@ public class WordCountSourceSplitter extends UserSourceInvokable { @Override public void invoke() throws Exception { - + br = new BufferedReader(new FileReader( + "src/test/resources/testdata/hamlet.txt")); while (true) { - try { - br = new BufferedReader(new FileReader( - "/home/strato/stratosphere-distrib/resources/hamlet.txt")); - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - while (line != null) { - if (line != "") { - for (String word : line.split(" ")) { - outRecord.setString(0, word); - emit(outRecord); - performanceCounter.count(); - } - } - line = br.readLine(); + line = br.readLine(); + if (line == null) { + break; + } + if (line != "") { + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + for (String word : line.split(" ")) { + outRecord.setString(0, word); + System.out.println("word=" + word); + emit(outRecord); + performanceCounter.count(); } - } catch (FileNotFoundException e) { - e.printStackTrace(); } - } - } - } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java new file mode 100644 index 00000000000..4002701dad3 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java @@ -0,0 +1,115 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.wordcount; + +import java.net.InetSocketAddress; + +import org.apache.log4j.Level; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.util.LogUtils; + +public class WordCountStarter { + + private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance, + int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks, + int sinkSubtasksPerInstance) throws Exception { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class, + sourceSubtasks, sourceSubtasksPerInstance); + graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks, + counterSubtasksPerInstance); + graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks, + sinkSubtasksPerInstance); + + graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0); + graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); + + return graphBuilder.getJobGraph(); + } + + private static void wrongArgs() { + System.out + .println("USAGE:\n" + + "run "); + } + + // TODO: arguments check + public static void main(String[] args) { + + if (args.length != 7) { + wrongArgs(); + } else { + LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO); + + int sourceSubtasks = 1; + int sourceSubtasksPerInstance = 1; + int counterSubtasks = 1; + int counterSubtasksPerInstance = 1; + int sinkSubtasks = 1; + int sinkSubtasksPerInstance = 1; + + try { + sourceSubtasks = Integer.parseInt(args[1]); + sourceSubtasksPerInstance = Integer.parseInt(args[2]); + counterSubtasks = Integer.parseInt(args[3]); + counterSubtasksPerInstance = Integer.parseInt(args[4]); + sinkSubtasks = Integer.parseInt(args[5]); + sinkSubtasksPerInstance = Integer.parseInt(args[6]); + } catch (Exception e) { + wrongArgs(); + } + + try { + JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance, + counterSubtasks, counterSubtasksPerInstance, sinkSubtasks, + sinkSubtasksPerInstance); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; + } + + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); + + exec.start(); + + Client client = new Client(new InetSocketAddress("localhost", 6498), + configuration); + + client.run(jG, true); + + exec.stop(); + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster mode"); + + Client client = new Client(new InetSocketAddress("dell150", 6123), + configuration); + client.run(jG, true); + } + + } catch (Exception e) { + System.out.println(e); + } + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java index 9ee557e605b..93bccbfac43 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java @@ -49,7 +49,7 @@ public class MutableTableState implements TableState { } @Override - public TableStateIterator getIterator() { + public MutableTableStateIterator getIterator() { // TODO Auto-generated method stub return new MutableTableStateIterator(state.entrySet().iterator()); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java index d54205c804c..732a0ffb6ab 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java @@ -65,7 +65,7 @@ public class WindowState { public void pushBack(StreamRecord record) { if (initTimestamp == -1) { - initTimestamp = record.getTuple(0).getField(windowFieldId); + initTimestamp = (Integer) record.getTuple(0).getField(windowFieldId); nextTimestamp = initTimestamp + computeGranularity; tempRecord = new StreamRecord(record.getNumOfFields()); } @@ -75,7 +75,7 @@ public class WindowState { currentRecordCount += 1; tempRecord = new StreamRecord(record.getNumOfFields()); } - tempRecord.addShadowTuple(record.getTuple(i)); + tempRecord.addTuple(record.getTuple(i)); } } -- GitLab