From 3ae8663210606a063e7f22f375fc42528425118f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=C3=A1rton=20Balassi?= Date: Mon, 14 Jul 2014 16:28:58 +0200 Subject: [PATCH] [streaming] WindowedWordCount Refactor --- .../streaming/api/FaultToleranceBuffer.java | 270 ------------------ .../wordcount/WindowWordCountCounter.java | 7 +- .../window/wordcount/WindowWordCountSink.java | 12 +- .../wordcount/WindowWordCountSource.java | 37 ++- .../wordcount/WindowWordCountSplitter.java | 14 +- .../test/wordcount/WordCountLocal.java | 133 --------- .../test/wordcount/WordCountSource.java | 9 +- 7 files changed, 44 insertions(+), 438 deletions(-) delete mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java delete mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java deleted file mode 100644 index b9e3a6b1c05..00000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java +++ /dev/null @@ -1,270 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.streaming.api; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; - -import eu.stratosphere.nephele.io.RecordWriter; - -/** - * An object to provide fault tolerance for Stratosphere stream processing. It - * works as a buffer to hold StreamRecords for a task for re-emitting failed, or - * timed out records. - */ -public class FaultToleranceBuffer { - - private long TIMEOUT = 1000; - - private Long timeOfLastUpdate; - private Map recordBuffer; - private Map ackCounter; - private SortedMap> recordsByTime; - private Map recordTimestamps; - - private List> outputs; - private final String channelID; - - private int numberOfOutputs; - - /** - * Creates fault tolerance buffer object for the given output channels and - * channel ID - * - * @param outputs - * List of outputs - * @param channelID - * ID of the task object that uses this buffer - */ - public FaultToleranceBuffer(List> outputs, - String channelID) { - this.timeOfLastUpdate = System.currentTimeMillis(); - this.outputs = outputs; - this.recordBuffer = new HashMap(); - this.ackCounter = new HashMap(); - this.numberOfOutputs = outputs.size(); - this.channelID = channelID; - this.recordsByTime = new TreeMap>(); - this.recordTimestamps = new HashMap(); - } - - /** - * Adds the record to the fault tolerance buffer. This record will be - * monitored for acknowledgements and timeout. - * - */ - public void addRecord(StreamRecord streamRecord) { - - recordBuffer.put(streamRecord.getId(), streamRecord); - ackCounter.put(streamRecord.getId(), numberOfOutputs); - addTimestamp(streamRecord.getId()); - } - - /** - * Checks for records that have timed out since the last check and fails them. - * - * @param currentTime - * Time when the check should be made, usually current system time. - * @return Returns the list of the records that have timed out. - */ - List timeoutRecords(Long currentTime) { - if (timeOfLastUpdate + TIMEOUT < currentTime) { - List timedOutRecords = new LinkedList(); - Map> timedOut = recordsByTime.subMap(0L, currentTime - - TIMEOUT); - - for (Set recordSet : timedOut.values()) { - if (!recordSet.isEmpty()) { - for (String recordID : recordSet) { - timedOutRecords.add(recordID); - } - } - } - - recordsByTime.keySet().removeAll(timedOut.keySet()); - for (String recordID : timedOutRecords) { - failRecord(recordID); - } - - timeOfLastUpdate = currentTime; - return timedOutRecords; - } - return null; - } - - /** - * Stores time stamp for a record by recordID and also adds the record to a - * map which maps a time stamp to the IDs of records that were emitted at that - * time. - *

- * Later used for timeouts. - * - * @param recordID - * ID of the record - */ - public void addTimestamp(String recordID) { - Long currentTime = System.currentTimeMillis(); - recordTimestamps.put(recordID, currentTime); - - if (recordsByTime.containsKey(currentTime)) { - recordsByTime.get(currentTime).add(recordID); - } else { - Set recordSet = new HashSet(); - recordSet.add(recordID); - recordsByTime.put(currentTime, recordSet); - } - } - - /** - * Returns a StreamRecord after removing it from the buffer - * - * @param recordID - * The ID of the record that will be popped - */ - public StreamRecord popRecord(String recordID) { - System.out.println("Pop ID: " + recordID); - StreamRecord record = recordBuffer.get(recordID); - removeRecord(recordID); - return record; - } - - /** - * Removes a StreamRecord by ID from the fault tolerance buffer, further acks - * will have no effects for this record. - * - * @param recordID - * The ID of the record that will be removed - * - */ - void removeRecord(String recordID) { - recordBuffer.remove(recordID); - ackCounter.remove(recordID); - try { - Long ts = recordTimestamps.remove(recordID); - recordsByTime.get(ts).remove(recordID); - } catch (NullPointerException e) { - - } catch (Exception e) { - e.printStackTrace(); - System.out.println(recordID); - } - } - - /** - * Acknowledges the record of the given ID, if all the outputs have sent - * acknowledgments, removes it from the buffer - * - * @param recordID - * ID of the record that has been acknowledged - */ - // TODO: find a place to call timeoutRecords - public void ackRecord(String recordID) { - if (ackCounter.containsKey(recordID)) { - int ackCount = ackCounter.get(recordID) - 1; - - if (ackCount == 0) { - removeRecord(recordID); - } else { - ackCounter.put(recordID, ackCount); - } - } - // timeoutRecords(System.currentTimeMillis()); - } - - /** - * Re-emits the failed record for the given ID, removes the old record and - * stores it with a new ID. - * - * @param recordID - * ID of the record that has been failed - */ - public void failRecord(String recordID) { - // Create new id to avoid double counting acks - System.out.println("Fail ID: " + recordID); - StreamRecord newRecord = popRecord(recordID).setId(channelID); - addRecord(newRecord); - reEmit(newRecord); - } - - /** - * Emit give record to all output channels - * - * @param record - * Record to be re-emitted - */ - public void reEmit(StreamRecord record) { - for (RecordWriter output : outputs) { - try { - output.emit(record); - System.out.println("Re-emitted"); - } catch (Exception e) { - System.out.println("Re-emit failed"); - } - } - - } - - public long getTIMEOUT() { - return this.TIMEOUT; - } - - public void setTIMEOUT(long TIMEOUT) { - this.TIMEOUT = TIMEOUT; - } - - public Map getRecordBuffer() { - return this.recordBuffer; - } - - public Long getTimeOfLastUpdate() { - return this.timeOfLastUpdate; - } - - public Map getAckCounter() { - return this.ackCounter; - } - - public SortedMap> getRecordsByTime() { - return this.recordsByTime; - } - - public Map getRecordTimestamps() { - return this.recordTimestamps; - } - - public List> getOutputs() { - return this.outputs; - } - - public String getChannelID() { - return this.channelID; - } - - public int getNumberOfOutputs() { - return this.numberOfOutputs; - } - - void setNumberOfOutputs(int numberOfOutputs) { - this.numberOfOutputs = numberOfOutputs; - } - -} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java index 533e8ebc1fa..a77183c19e9 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java @@ -18,7 +18,6 @@ package eu.stratosphere.streaming.test.window.wordcount; import java.util.HashMap; import java.util.Map; -import eu.stratosphere.streaming.api.AtomRecord; import eu.stratosphere.streaming.api.StreamRecord; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.types.IntValue; @@ -35,7 +34,6 @@ public class WindowWordCountCounter extends UserTaskInvokable { private IntValue countValue = new IntValue(1); private LongValue timestamp = new LongValue(0); private String word = ""; - private AtomRecord outputRecord = new AtomRecord(3); private int count = 1; @Override @@ -51,10 +49,7 @@ public class WindowWordCountCounter extends UserTaskInvokable { wordCounts.put(word, 1); countValue.setValue(1); } - outputRecord.setField(0, wordValue); - outputRecord.setField(1, countValue); - outputRecord.setField(2, timestamp); - emit(new StreamRecord(outputRecord)); + emit(new StreamRecord(wordValue, countValue, timestamp)); } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSink.java index 6082d52459e..d9fb2b4e400 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSink.java @@ -23,15 +23,15 @@ import eu.stratosphere.types.StringValue; public class WindowWordCountSink implements UserSinkInvokable { - private StringValue word = new StringValue(""); - private IntValue count = new IntValue(1); - private LongValue timestamp = new LongValue(0); + private StringValue word = new StringValue(); + private IntValue count = new IntValue(); + private LongValue timestamp = new LongValue(); @Override public void invoke(StreamRecord record) throws Exception { - word=(StringValue) record.getField(0, 0); - count=(IntValue) record.getField(0, 1); - timestamp=(LongValue) record.getField(0, 2); + word = (StringValue) record.getField(0, 0); + count = (IntValue) record.getField(0, 1); + timestamp = (LongValue) record.getField(0, 2); System.out.println("============================================"); System.out.println(word.getValue() + " " + count.getValue() + " " + timestamp.getValue()); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSource.java index fe6926ea274..0d839e07d03 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSource.java @@ -15,26 +15,45 @@ package eu.stratosphere.streaming.test.window.wordcount; -import eu.stratosphere.streaming.api.AtomRecord; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; + import eu.stratosphere.streaming.api.StreamRecord; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.types.LongValue; import eu.stratosphere.types.StringValue; public class WindowWordCountSource extends UserSourceInvokable { - private final String motto = "Gyuszi Gabor Big Marci Gyuszi"; + + private BufferedReader br = null; + private String line = new String(); + private StringValue lineValue = new StringValue(); + private LongValue timestampValue = new LongValue(); + private long timestamp; - private AtomRecord mottoRecord = null; + + public WindowWordCountSource() { + try { + br = new BufferedReader(new FileReader( + "src/test/resources/testdata/hamlet.txt")); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } @Override public void invoke() throws Exception { timestamp = 0; - for (int i = 0; i < 2; ++i) { - mottoRecord = new AtomRecord(2); - mottoRecord.setField(0, new StringValue(motto)); - mottoRecord.setField(1, new LongValue(timestamp)); - emit(new StreamRecord(mottoRecord)); - ++timestamp; + line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); + while (line != null) { + if (line != "") { + lineValue.setValue(line); + timestampValue.setValue(timestamp); + emit(new StreamRecord(lineValue, timestampValue)); + } + line = br.readLine(); + timestamp++; } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSplitter.java index bd701407555..bc4c9f33aa7 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountSplitter.java @@ -15,7 +15,6 @@ package eu.stratosphere.streaming.test.window.wordcount; -import eu.stratosphere.streaming.api.AtomRecord; import eu.stratosphere.streaming.api.StreamRecord; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.types.LongValue; @@ -23,11 +22,10 @@ import eu.stratosphere.types.StringValue; public class WindowWordCountSplitter extends UserTaskInvokable { - private StringValue sentence = new StringValue(""); - private LongValue timestamp = new LongValue(0); - private String[] words = new String[0]; - private StringValue wordValue = new StringValue(""); - private AtomRecord outputRecord = new AtomRecord(2); + private StringValue sentence = new StringValue(); + private LongValue timestamp = new LongValue(); + private String[] words = new String[]{}; + private StringValue wordValue = new StringValue(); @Override public void invoke(StreamRecord record) throws Exception { @@ -38,9 +36,7 @@ public class WindowWordCountSplitter extends UserTaskInvokable { words = sentence.getValue().split(" "); for (CharSequence word : words) { wordValue.setValue(word); - outputRecord.setField(0, wordValue); - outputRecord.setField(1, timestamp); - emit(new StreamRecord(outputRecord)); + emit(new StreamRecord(wordValue, timestamp)); } } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java deleted file mode 100644 index 8c97151d498..00000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java +++ /dev/null @@ -1,133 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.streaming.test.wordcount; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.log4j.Level; -import org.junit.Assert; - -import eu.stratosphere.client.minicluster.NepheleMiniCluster; -import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.nephele.client.JobClient; -import eu.stratosphere.nephele.jobgraph.JobGraph; -import eu.stratosphere.streaming.api.JobGraphBuilder; -import eu.stratosphere.types.StringValue; -import eu.stratosphere.util.LogUtils; - -public class WordCountLocal { - - private static final int MINIMUM_HEAP_SIZE_MB = 192; - - protected final Configuration config; - - private NepheleMiniCluster executor; - - public WordCountLocal() { - this(new Configuration()); - } - - public WordCountLocal(Configuration config) { - verifyJvmOptions(); - this.config = config; - - LogUtils.initializeDefaultConsoleLogger(Level.WARN); - } - - - private void verifyJvmOptions() { - long heap = Runtime.getRuntime().maxMemory() >> 20; - Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB - + "m", heap > MINIMUM_HEAP_SIZE_MB - 50); - } - - - public void startCluster() throws Exception { - this.executor = new NepheleMiniCluster(); - this.executor.setDefaultOverwriteFiles(true); - this.executor.start(); - } - - public void stopCluster() throws Exception { - try { - if (this.executor != null) { - this.executor.stop(); - this.executor = null; - FileSystem.closeAll(); - System.gc(); - } - } finally { - } - } - - public void runJob() throws Exception { - // submit job - JobGraph jobGraph = null; - try { - jobGraph = getJobGraph(); - } - catch(Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Failed to obtain JobGraph!"); - } - - Assert.assertNotNull("Obtained null JobGraph", jobGraph); - - try { - JobClient client = null; - try { - client = this.executor.getJobClient(jobGraph); } - catch(Exception e) { - System.err.println("here"); - } - client.submitJobAndWait(); - } - catch(Exception e) { - System.err.println(e.getMessage()); - e.printStackTrace(); - Assert.fail("Job execution failed!"); - } - } - - - protected JobGraph getJobGraph() throws Exception { - JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("WordCountSource", WordCountSource.class); - graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2); - graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2); - graphBuilder.setSink("WordCountSink", WordCountSink.class); - - graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter"); - graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, - StringValue.class); - graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink"); - - return graphBuilder.getJobGraph(); - } - - - - public static void main(String[] args){ - WordCountLocal wC = new WordCountLocal(); - - try { - wC.startCluster(); - wC.runJob(); - wC.stopCluster(); - } catch (Exception e) {} - - } -} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java index 20059aa0407..0014466df26 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java @@ -29,8 +29,7 @@ public class WordCountSource extends UserSourceInvokable { private BufferedReader br = null; private String line = new String(); private StringValue lineValue = new StringValue(); - private Value[] hamletValues = new StringValue[1]; - private StreamRecord hamletRecord = new StreamRecord(1); + private Value[] values = new StringValue[1]; public WordCountSource() { try { @@ -47,9 +46,9 @@ public class WordCountSource extends UserSourceInvokable { while (line != null) { if (line != "") { lineValue.setValue(line); - hamletValues[0] = lineValue; - // TODO: use hamletRecord instead - emit(new StreamRecord(hamletValues)); + values[0] = lineValue; + // TODO: object reuse + emit(new StreamRecord(values)); } line = br.readLine(); } -- GitLab