diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java new file mode 100644 index 0000000000000000000000000000000000000000..b9e3a6b1c05bb8634e589c10da3987f8db2a033f --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java @@ -0,0 +1,270 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.api; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import eu.stratosphere.nephele.io.RecordWriter; + +/** + * An object to provide fault tolerance for Stratosphere stream processing. It + * works as a buffer to hold StreamRecords for a task for re-emitting failed, or + * timed out records. + */ +public class FaultToleranceBuffer { + + private long TIMEOUT = 1000; + + private Long timeOfLastUpdate; + private Map recordBuffer; + private Map ackCounter; + private SortedMap> recordsByTime; + private Map recordTimestamps; + + private List> outputs; + private final String channelID; + + private int numberOfOutputs; + + /** + * Creates fault tolerance buffer object for the given output channels and + * channel ID + * + * @param outputs + * List of outputs + * @param channelID + * ID of the task object that uses this buffer + */ + public FaultToleranceBuffer(List> outputs, + String channelID) { + this.timeOfLastUpdate = System.currentTimeMillis(); + this.outputs = outputs; + this.recordBuffer = new HashMap(); + this.ackCounter = new HashMap(); + this.numberOfOutputs = outputs.size(); + this.channelID = channelID; + this.recordsByTime = new TreeMap>(); + this.recordTimestamps = new HashMap(); + } + + /** + * Adds the record to the fault tolerance buffer. This record will be + * monitored for acknowledgements and timeout. + * + */ + public void addRecord(StreamRecord streamRecord) { + + recordBuffer.put(streamRecord.getId(), streamRecord); + ackCounter.put(streamRecord.getId(), numberOfOutputs); + addTimestamp(streamRecord.getId()); + } + + /** + * Checks for records that have timed out since the last check and fails them. + * + * @param currentTime + * Time when the check should be made, usually current system time. + * @return Returns the list of the records that have timed out. + */ + List timeoutRecords(Long currentTime) { + if (timeOfLastUpdate + TIMEOUT < currentTime) { + List timedOutRecords = new LinkedList(); + Map> timedOut = recordsByTime.subMap(0L, currentTime + - TIMEOUT); + + for (Set recordSet : timedOut.values()) { + if (!recordSet.isEmpty()) { + for (String recordID : recordSet) { + timedOutRecords.add(recordID); + } + } + } + + recordsByTime.keySet().removeAll(timedOut.keySet()); + for (String recordID : timedOutRecords) { + failRecord(recordID); + } + + timeOfLastUpdate = currentTime; + return timedOutRecords; + } + return null; + } + + /** + * Stores time stamp for a record by recordID and also adds the record to a + * map which maps a time stamp to the IDs of records that were emitted at that + * time. + *

+ * Later used for timeouts. + * + * @param recordID + * ID of the record + */ + public void addTimestamp(String recordID) { + Long currentTime = System.currentTimeMillis(); + recordTimestamps.put(recordID, currentTime); + + if (recordsByTime.containsKey(currentTime)) { + recordsByTime.get(currentTime).add(recordID); + } else { + Set recordSet = new HashSet(); + recordSet.add(recordID); + recordsByTime.put(currentTime, recordSet); + } + } + + /** + * Returns a StreamRecord after removing it from the buffer + * + * @param recordID + * The ID of the record that will be popped + */ + public StreamRecord popRecord(String recordID) { + System.out.println("Pop ID: " + recordID); + StreamRecord record = recordBuffer.get(recordID); + removeRecord(recordID); + return record; + } + + /** + * Removes a StreamRecord by ID from the fault tolerance buffer, further acks + * will have no effects for this record. + * + * @param recordID + * The ID of the record that will be removed + * + */ + void removeRecord(String recordID) { + recordBuffer.remove(recordID); + ackCounter.remove(recordID); + try { + Long ts = recordTimestamps.remove(recordID); + recordsByTime.get(ts).remove(recordID); + } catch (NullPointerException e) { + + } catch (Exception e) { + e.printStackTrace(); + System.out.println(recordID); + } + } + + /** + * Acknowledges the record of the given ID, if all the outputs have sent + * acknowledgments, removes it from the buffer + * + * @param recordID + * ID of the record that has been acknowledged + */ + // TODO: find a place to call timeoutRecords + public void ackRecord(String recordID) { + if (ackCounter.containsKey(recordID)) { + int ackCount = ackCounter.get(recordID) - 1; + + if (ackCount == 0) { + removeRecord(recordID); + } else { + ackCounter.put(recordID, ackCount); + } + } + // timeoutRecords(System.currentTimeMillis()); + } + + /** + * Re-emits the failed record for the given ID, removes the old record and + * stores it with a new ID. + * + * @param recordID + * ID of the record that has been failed + */ + public void failRecord(String recordID) { + // Create new id to avoid double counting acks + System.out.println("Fail ID: " + recordID); + StreamRecord newRecord = popRecord(recordID).setId(channelID); + addRecord(newRecord); + reEmit(newRecord); + } + + /** + * Emit give record to all output channels + * + * @param record + * Record to be re-emitted + */ + public void reEmit(StreamRecord record) { + for (RecordWriter output : outputs) { + try { + output.emit(record); + System.out.println("Re-emitted"); + } catch (Exception e) { + System.out.println("Re-emit failed"); + } + } + + } + + public long getTIMEOUT() { + return this.TIMEOUT; + } + + public void setTIMEOUT(long TIMEOUT) { + this.TIMEOUT = TIMEOUT; + } + + public Map getRecordBuffer() { + return this.recordBuffer; + } + + public Long getTimeOfLastUpdate() { + return this.timeOfLastUpdate; + } + + public Map getAckCounter() { + return this.ackCounter; + } + + public SortedMap> getRecordsByTime() { + return this.recordsByTime; + } + + public Map getRecordTimestamps() { + return this.recordTimestamps; + } + + public List> getOutputs() { + return this.outputs; + } + + public String getChannelID() { + return this.channelID; + } + + public int getNumberOfOutputs() { + return this.numberOfOutputs; + } + + void setNumberOfOutputs(int numberOfOutputs) { + this.numberOfOutputs = numberOfOutputs; + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java index 158ba0aede46bade6a2769fd406a9781fee87516..1b261199c763d4c3242ca4a96fb599ad5a12fb70 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCount.java @@ -27,15 +27,20 @@ public class WindowWordCount extends TestBase2 { @Override public JobGraph getJobGraph() { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class); - graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1); - graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1); + graphBuilder.setSource("WindowWordCountSource", + WindowWordCountSource.class); + graphBuilder.setTask("WindowWordCountSplitter", + WindowWordCountSplitter.class, 1); + graphBuilder.setTask("WindowWordCountCounter", + WindowWordCountCounter.class, 1); graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class); - graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter"); - graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0, - StringValue.class); - graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink"); + graphBuilder.broadcastConnect("WindowWordCountSource", + "WindowWordCountSplitter"); + graphBuilder.fieldsConnect("WindowWordCountSplitter", + "WindowWordCountCounter", 0, StringValue.class); + graphBuilder.broadcastConnect("WindowWordCountCounter", + "WindowWordCountSink"); return graphBuilder.getJobGraph(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java index 72cf64cc3c0dd6c26229ab7a1f2962a4b87490b2..533e8ebc1fa59d00580067ec6bec1f778dbeb9a7 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/window/wordcount/WindowWordCountCounter.java @@ -26,10 +26,10 @@ import eu.stratosphere.types.LongValue; import eu.stratosphere.types.StringValue; public class WindowWordCountCounter extends UserTaskInvokable { - + private int windowSize = 100; private int slidingStep = 20; - + private Map wordCounts = new HashMap(); private StringValue wordValue = new StringValue(""); private IntValue countValue = new IntValue(1); @@ -40,8 +40,8 @@ public class WindowWordCountCounter extends UserTaskInvokable { @Override public void invoke(StreamRecord record) throws Exception { - wordValue=(StringValue) record.getField(0, 0); - timestamp=(LongValue) record.getField(0, 1); + wordValue = (StringValue) record.getField(0, 0); + timestamp = (LongValue) record.getField(0, 1); if (wordCounts.containsKey(word)) { count = wordCounts.get(word) + 1; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java index 8d3ed3bcc4c10624e48954b35f6a3f79a1830d0a..13c23bbe19f659bd02442659bd9a76f0d15e6601 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountCounter.java @@ -18,7 +18,6 @@ package eu.stratosphere.streaming.test.wordcount; import java.util.HashMap; import java.util.Map; -import eu.stratosphere.streaming.api.AtomRecord; import eu.stratosphere.streaming.api.StreamRecord; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.types.IntValue; @@ -30,14 +29,13 @@ public class WordCountCounter extends UserTaskInvokable { private StringValue wordValue = new StringValue(""); private IntValue countValue = new IntValue(1); private String word = ""; - private AtomRecord outputRecord = new AtomRecord(2); private int count = 1; @Override public void invoke(StreamRecord record) throws Exception { wordValue = (StringValue) record.getRecord(0).getField(0); word = wordValue.getValue(); - + if (wordCounts.containsKey(word)) { count = wordCounts.get(word) + 1; wordCounts.put(word, count); @@ -46,8 +44,7 @@ public class WordCountCounter extends UserTaskInvokable { wordCounts.put(word, 1); countValue.setValue(1); } - outputRecord.setField(0, wordValue); - outputRecord.setField(1, countValue); - emit(new StreamRecord(outputRecord)); + // TODO: object reuse + emit(new StreamRecord(wordValue, countValue)); } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..8c97151d498da8735c9d82fb2208d5f32d412d98 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountLocal.java @@ -0,0 +1,133 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.test.wordcount; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.log4j.Level; +import org.junit.Assert; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.client.JobClient; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.types.StringValue; +import eu.stratosphere.util.LogUtils; + +public class WordCountLocal { + + private static final int MINIMUM_HEAP_SIZE_MB = 192; + + protected final Configuration config; + + private NepheleMiniCluster executor; + + public WordCountLocal() { + this(new Configuration()); + } + + public WordCountLocal(Configuration config) { + verifyJvmOptions(); + this.config = config; + + LogUtils.initializeDefaultConsoleLogger(Level.WARN); + } + + + private void verifyJvmOptions() { + long heap = Runtime.getRuntime().maxMemory() >> 20; + Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + + "m", heap > MINIMUM_HEAP_SIZE_MB - 50); + } + + + public void startCluster() throws Exception { + this.executor = new NepheleMiniCluster(); + this.executor.setDefaultOverwriteFiles(true); + this.executor.start(); + } + + public void stopCluster() throws Exception { + try { + if (this.executor != null) { + this.executor.stop(); + this.executor = null; + FileSystem.closeAll(); + System.gc(); + } + } finally { + } + } + + public void runJob() throws Exception { + // submit job + JobGraph jobGraph = null; + try { + jobGraph = getJobGraph(); + } + catch(Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Failed to obtain JobGraph!"); + } + + Assert.assertNotNull("Obtained null JobGraph", jobGraph); + + try { + JobClient client = null; + try { + client = this.executor.getJobClient(jobGraph); } + catch(Exception e) { + System.err.println("here"); + } + client.submitJobAndWait(); + } + catch(Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + Assert.fail("Job execution failed!"); + } + } + + + protected JobGraph getJobGraph() throws Exception { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + graphBuilder.setSource("WordCountSource", WordCountSource.class); + graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2); + graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2); + graphBuilder.setSink("WordCountSink", WordCountSink.class); + + graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter"); + graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, + StringValue.class); + graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink"); + + return graphBuilder.getJobGraph(); + } + + + + public static void main(String[] args){ + WordCountLocal wC = new WordCountLocal(); + + try { + wC.startCluster(); + wC.runJob(); + wC.stopCluster(); + } catch (Exception e) {} + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java index 8498c59187b7984bfd9106aa7d78424a3e70c163..921f49297bbe88988602fc944583c2efccee79ff 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSink.java @@ -23,7 +23,7 @@ import eu.stratosphere.types.StringValue; public class WordCountSink implements UserSinkInvokable { private StringValue word = new StringValue(""); - private IntValue count = new IntValue(1); + private IntValue count = new IntValue(); @Override public void invoke(StreamRecord record) throws Exception { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java index 3a4439c51fc1ddaaf95e0f2e7c966eae354360e1..20059aa04073e2af9c93c93c136fba79b0cbbd95 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSource.java @@ -15,23 +15,43 @@ package eu.stratosphere.streaming.test.wordcount; -import eu.stratosphere.streaming.api.AtomRecord; +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; + import eu.stratosphere.streaming.api.StreamRecord; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.types.StringValue; +import eu.stratosphere.types.Value; public class WordCountSource extends UserSourceInvokable { - // private final String motto = - // "Stratosphere Big Data looks tiny from here"; - private final String motto = "Gyuszi Gabor Big Marci Gyuszi"; - private StreamRecord mottoRecord; + private BufferedReader br = null; + private String line = new String(); + private StringValue lineValue = new StringValue(); + private Value[] hamletValues = new StringValue[1]; + private StreamRecord hamletRecord = new StreamRecord(1); + + public WordCountSource() { + try { + br = new BufferedReader(new FileReader( + "src/test/resources/testdata/hamlet.txt")); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + } @Override public void invoke() throws Exception { - mottoRecord = new StreamRecord(new AtomRecord(new StringValue(motto))); - for (int i = 0; i < 10000; i++) { - emit(mottoRecord); + line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); + while (line != null) { + if (line != "") { + lineValue.setValue(line); + hamletValues[0] = lineValue; + // TODO: use hamletRecord instead + emit(new StreamRecord(hamletValues)); + } + line = br.readLine(); } } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java index b961699f2ab04c0d89f995e15dfff0c008148940..1cbd5ca3494b950dd8573a25655c6d418b586ad3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java @@ -15,7 +15,6 @@ package eu.stratosphere.streaming.test.wordcount; -import eu.stratosphere.streaming.api.AtomRecord; import eu.stratosphere.streaming.api.StreamRecord; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.types.StringValue; @@ -25,18 +24,15 @@ public class WordCountSplitter extends UserTaskInvokable { private StringValue sentence = new StringValue(""); private String[] words = new String[0]; private StringValue wordValue = new StringValue(""); - private AtomRecord outputRecord = new AtomRecord(wordValue); @Override public void invoke(StreamRecord record) throws Exception { - //record.getFieldInto(0, sentence); sentence = (StringValue) record.getRecord(0).getField(0); System.out.println("to split: " + sentence.getValue()); words = sentence.getValue().split(" "); for (CharSequence word : words) { wordValue.setValue(word); - outputRecord.setField(0, wordValue); - emit(new StreamRecord(outputRecord)); + emit(new StreamRecord(wordValue)); } } } \ No newline at end of file