提交 f42bfb22 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] wordcount moved out from testbase

上级 9fc91543
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import eu.stratosphere.nephele.io.RecordWriter;
/**
* An object to provide fault tolerance for Stratosphere stream processing. It
* works as a buffer to hold StreamRecords for a task for re-emitting failed, or
* timed out records.
*/
public class FaultToleranceBuffer {
private long TIMEOUT = 1000;
private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private SortedMap<Long, Set<String>> recordsByTime;
private Map<String, Long> recordTimestamps;
private List<RecordWriter<StreamRecord>> outputs;
private final String channelID;
private int numberOfOutputs;
/**
* Creates fault tolerance buffer object for the given output channels and
* channel ID
*
* @param outputs
* List of outputs
* @param channelID
* ID of the task object that uses this buffer
*/
public FaultToleranceBuffer(List<RecordWriter<StreamRecord>> outputs,
String channelID) {
this.timeOfLastUpdate = System.currentTimeMillis();
this.outputs = outputs;
this.recordBuffer = new HashMap<String, StreamRecord>();
this.ackCounter = new HashMap<String, Integer>();
this.numberOfOutputs = outputs.size();
this.channelID = channelID;
this.recordsByTime = new TreeMap<Long, Set<String>>();
this.recordTimestamps = new HashMap<String, Long>();
}
/**
* Adds the record to the fault tolerance buffer. This record will be
* monitored for acknowledgements and timeout.
*
*/
public void addRecord(StreamRecord streamRecord) {
recordBuffer.put(streamRecord.getId(), streamRecord);
ackCounter.put(streamRecord.getId(), numberOfOutputs);
addTimestamp(streamRecord.getId());
}
/**
* Checks for records that have timed out since the last check and fails them.
*
* @param currentTime
* Time when the check should be made, usually current system time.
* @return Returns the list of the records that have timed out.
*/
List<String> timeoutRecords(Long currentTime) {
if (timeOfLastUpdate + TIMEOUT < currentTime) {
List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime
- TIMEOUT);
for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
for (String recordID : recordSet) {
timedOutRecords.add(recordID);
}
}
}
recordsByTime.keySet().removeAll(timedOut.keySet());
for (String recordID : timedOutRecords) {
failRecord(recordID);
}
timeOfLastUpdate = currentTime;
return timedOutRecords;
}
return null;
}
/**
* Stores time stamp for a record by recordID and also adds the record to a
* map which maps a time stamp to the IDs of records that were emitted at that
* time.
* <p>
* Later used for timeouts.
*
* @param recordID
* ID of the record
*/
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(recordID, currentTime);
if (recordsByTime.containsKey(currentTime)) {
recordsByTime.get(currentTime).add(recordID);
} else {
Set<String> recordSet = new HashSet<String>();
recordSet.add(recordID);
recordsByTime.put(currentTime, recordSet);
}
}
/**
* Returns a StreamRecord after removing it from the buffer
*
* @param recordID
* The ID of the record that will be popped
*/
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
StreamRecord record = recordBuffer.get(recordID);
removeRecord(recordID);
return record;
}
/**
* Removes a StreamRecord by ID from the fault tolerance buffer, further acks
* will have no effects for this record.
*
* @param recordID
* The ID of the record that will be removed
*
*/
void removeRecord(String recordID) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
try {
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(ts).remove(recordID);
} catch (NullPointerException e) {
} catch (Exception e) {
e.printStackTrace();
System.out.println(recordID);
}
}
/**
* Acknowledges the record of the given ID, if all the outputs have sent
* acknowledgments, removes it from the buffer
*
* @param recordID
* ID of the record that has been acknowledged
*/
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
if (ackCount == 0) {
removeRecord(recordID);
} else {
ackCounter.put(recordID, ackCount);
}
}
// timeoutRecords(System.currentTimeMillis());
}
/**
* Re-emits the failed record for the given ID, removes the old record and
* stores it with a new ID.
*
* @param recordID
* ID of the record that has been failed
*/
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
System.out.println("Fail ID: " + recordID);
StreamRecord newRecord = popRecord(recordID).setId(channelID);
addRecord(newRecord);
reEmit(newRecord);
}
/**
* Emit give record to all output channels
*
* @param record
* Record to be re-emitted
*/
public void reEmit(StreamRecord record) {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(record);
System.out.println("Re-emitted");
} catch (Exception e) {
System.out.println("Re-emit failed");
}
}
}
public long getTIMEOUT() {
return this.TIMEOUT;
}
public void setTIMEOUT(long TIMEOUT) {
this.TIMEOUT = TIMEOUT;
}
public Map<String, StreamRecord> getRecordBuffer() {
return this.recordBuffer;
}
public Long getTimeOfLastUpdate() {
return this.timeOfLastUpdate;
}
public Map<String, Integer> getAckCounter() {
return this.ackCounter;
}
public SortedMap<Long, Set<String>> getRecordsByTime() {
return this.recordsByTime;
}
public Map<String, Long> getRecordTimestamps() {
return this.recordTimestamps;
}
public List<RecordWriter<StreamRecord>> getOutputs() {
return this.outputs;
}
public String getChannelID() {
return this.channelID;
}
public int getNumberOfOutputs() {
return this.numberOfOutputs;
}
void setNumberOfOutputs(int numberOfOutputs) {
this.numberOfOutputs = numberOfOutputs;
}
}
......@@ -27,15 +27,20 @@ public class WindowWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1);
graphBuilder.setSource("WindowWordCountSource",
WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter",
WindowWordCountSplitter.class, 1);
graphBuilder.setTask("WindowWordCountCounter",
WindowWordCountCounter.class, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink");
graphBuilder.broadcastConnect("WindowWordCountSource",
"WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter",
"WindowWordCountCounter", 0, StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter",
"WindowWordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -26,10 +26,10 @@ import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
......@@ -40,8 +40,8 @@ public class WindowWordCountCounter extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue=(StringValue) record.getField(0, 0);
timestamp=(LongValue) record.getField(0, 1);
wordValue = (StringValue) record.getField(0, 0);
timestamp = (LongValue) record.getField(0, 1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
......
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.test.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
......@@ -30,14 +29,13 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private String word = "";
private AtomRecord outputRecord = new AtomRecord(2);
private int count = 1;
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getRecord(0).getField(0);
word = wordValue.getValue();
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -46,8 +44,7 @@ public class WordCountCounter extends UserTaskInvokable {
wordCounts.put(word, 1);
countValue.setValue(1);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(new StreamRecord(outputRecord));
// TODO: object reuse
emit(new StreamRecord(wordValue, countValue));
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.wordcount;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.junit.Assert;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.LogUtils;
public class WordCountLocal {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
protected final Configuration config;
private NepheleMiniCluster executor;
public WordCountLocal() {
this(new Configuration());
}
public WordCountLocal(Configuration config) {
verifyJvmOptions();
this.config = config;
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
this.executor.start();
}
public void stopCluster() throws Exception {
try {
if (this.executor != null) {
this.executor.stop();
this.executor = null;
FileSystem.closeAll();
System.gc();
}
} finally {
}
}
public void runJob() throws Exception {
// submit job
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = null;
try {
client = this.executor.getJobClient(jobGraph); }
catch(Exception e) {
System.err.println("here");
}
client.submitJobAndWait();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
}
protected JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args){
WordCountLocal wC = new WordCountLocal();
try {
wC.startCluster();
wC.runJob();
wC.stopCluster();
} catch (Exception e) {}
}
}
......@@ -23,7 +23,7 @@ import eu.stratosphere.types.StringValue;
public class WordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private IntValue count = new IntValue();
@Override
public void invoke(StreamRecord record) throws Exception {
......
......@@ -15,23 +15,43 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class WordCountSource extends UserSourceInvokable {
// private final String motto =
// "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private StreamRecord mottoRecord;
private BufferedReader br = null;
private String line = new String();
private StringValue lineValue = new StringValue();
private Value[] hamletValues = new StringValue[1];
private StreamRecord hamletRecord = new StreamRecord(1);
public WordCountSource() {
try {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
mottoRecord = new StreamRecord(new AtomRecord(new StringValue(motto)));
for (int i = 0; i < 10000; i++) {
emit(mottoRecord);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineValue.setValue(line);
hamletValues[0] = lineValue;
// TODO: use hamletRecord instead
emit(new StreamRecord(hamletValues));
}
line = br.readLine();
}
}
}
\ No newline at end of file
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.StringValue;
......@@ -25,18 +24,15 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private AtomRecord outputRecord = new AtomRecord(wordValue);
@Override
public void invoke(StreamRecord record) throws Exception {
//record.getFieldInto(0, sentence);
sentence = (StringValue) record.getRecord(0).getField(0);
System.out.println("to split: " + sentence.getValue());
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
emit(new StreamRecord(outputRecord));
emit(new StreamRecord(wordValue));
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册