提交 cef2a4d0 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Added WordCountDummySource for simpler testing

上级 38d108aa
......@@ -26,13 +26,14 @@ import java.util.TreeMap;
import eu.stratosphere.nephele.io.RecordWriter;
/** An object to provide fault tolerance for Stratosphere stream processing.
* It works as a buffer to hold StreamRecords for a task for re-emitting failed, or timed out records.
/**
* An object to provide fault tolerance for Stratosphere stream processing. It
* works as a buffer to hold StreamRecords for a task for re-emitting failed, or
* timed out records.
*/
public class FaultToleranceBuffer {
private long TIMEOUT = 1000;
private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
......@@ -44,12 +45,14 @@ public class FaultToleranceBuffer {
private int numberOfOutputs;
/**Creates fault tolerance buffer object for the given output channels and channel ID
/**
* Creates fault tolerance buffer object for the given output channels and
* channel ID
*
* @param outputs
* List of outputs
* List of outputs
* @param channelID
* ID of the task object that uses this buffer
* ID of the task object that uses this buffer
*/
public FaultToleranceBuffer(List<RecordWriter<StreamRecord>> outputs,
String channelID) {
......@@ -63,9 +66,11 @@ public class FaultToleranceBuffer {
this.recordTimestamps = new HashMap<String, Long>();
}
/** Adds the record to the fault tolerance buffer. This record will be monitored for acknowledgements and timeout.
*
*/
/**
* Adds the record to the fault tolerance buffer. This record will be
* monitored for acknowledgements and timeout.
*
*/
public void addRecord(StreamRecord streamRecord) {
recordBuffer.put(streamRecord.getId(), streamRecord);
......@@ -73,18 +78,20 @@ public class FaultToleranceBuffer {
addTimestamp(streamRecord.getId());
}
/** Checks for records that have timed out since the last check and fails them.
/**
* Checks for records that have timed out since the last check and fails
* them.
*
* @param currentTime
* Time when the check should be made, usually current system time.
* @return
* Returns the list of the records that have timed out.
* Time when the check should be made, usually current system
* time.
* @return Returns the list of the records that have timed out.
*/
List<String> timeoutRecords(Long currentTime) {
if (timeOfLastUpdate + TIMEOUT < currentTime) {
List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime
- TIMEOUT);
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L,
currentTime - TIMEOUT);
for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
......@@ -105,13 +112,15 @@ public class FaultToleranceBuffer {
return null;
}
/**Stores time stamp for a record by recordID
* and also adds the record to a map which maps a time stamp to the IDs of records that were emitted at that time.
/**
* Stores time stamp for a record by recordID and also adds the record to a
* map which maps a time stamp to the IDs of records that were emitted at
* that time.
* <p>
* Later used for timeouts.
*
* @param recordID
* ID of the record
* ID of the record
*/
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
......@@ -126,10 +135,11 @@ public class FaultToleranceBuffer {
}
}
/**Returns a StreamRecord after removing it from the buffer
/**
* Returns a StreamRecord after removing it from the buffer
*
* @param recordID
* The ID of the record that will be popped
* The ID of the record that will be popped
*/
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
......@@ -138,9 +148,12 @@ public class FaultToleranceBuffer {
return record;
}
/** Removes a StreamRecord by ID from the fault tolerance buffer, further acks will have no effects for this record.
/**
* Removes a StreamRecord by ID from the fault tolerance buffer, further
* acks will have no effects for this record.
*
* @param recordID
* The ID of the record that will be removed
* The ID of the record that will be removed
*
*/
void removeRecord(String recordID) {
......@@ -157,12 +170,14 @@ public class FaultToleranceBuffer {
}
}
/**Acknowledges the record of the given ID, if all the outputs have sent acknowledgments, removes it from the buffer
/**
* Acknowledges the record of the given ID, if all the outputs have sent
* acknowledgments, removes it from the buffer
*
* @param recordID
* ID of the record that has been acknowledged
* ID of the record that has been acknowledged
*/
//TODO: find a place to call timeoutRecords
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
......@@ -173,13 +188,15 @@ public class FaultToleranceBuffer {
ackCounter.put(recordID, ackCount);
}
}
//timeoutRecords(System.currentTimeMillis());
// timeoutRecords(System.currentTimeMillis());
}
/**Re-emits the failed record for the given ID, removes the old record and stores it with a new ID.
/**
* Re-emits the failed record for the given ID, removes the old record and
* stores it with a new ID.
*
* @param recordID
* ID of the record that has been failed
* ID of the record that has been failed
*/
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
......@@ -188,12 +205,13 @@ public class FaultToleranceBuffer {
addRecord(newRecord);
reEmit(newRecord);
}
/**
* Emit give record to all output channels
* @param record
* Record to be re-emitted
*/
/**
* Emit give record to all output channels
*
* @param record
* Record to be re-emitted
*/
public void reEmit(StreamRecord record) {
for (RecordWriter<StreamRecord> output : outputs) {
try {
......
......@@ -25,7 +25,7 @@ public class WordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class WordCountDummySource extends UserSourceInvokable {
private String line = new String();
private StringValue lineValue = new StringValue();
private Value[] values = new StringValue[1];
public WordCountDummySource() {
line = "first second";
lineValue.setValue(line);
values[0] = lineValue;
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1; i++) {
emit(new StreamRecord(values));
System.out.println("xxxxxxxxx");
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.wordcount;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.junit.Assert;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.LogUtils;
public class WordCountLocal {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
protected final Configuration config;
private NepheleMiniCluster executor;
public WordCountLocal() {
this(new Configuration());
}
public WordCountLocal(Configuration config) {
verifyJvmOptions();
this.config = config;
LogUtils.initializeDefaultConsoleLogger(Level.WARN);
}
private void verifyJvmOptions() {
long heap = Runtime.getRuntime().maxMemory() >> 20;
Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
this.executor.start();
}
public void stopCluster() throws Exception {
try {
if (this.executor != null) {
this.executor.stop();
this.executor = null;
FileSystem.closeAll();
System.gc();
}
} finally {
}
}
public void runJob() throws Exception {
// submit job
JobGraph jobGraph = null;
try {
jobGraph = getJobGraph();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Failed to obtain JobGraph!");
}
Assert.assertNotNull("Obtained null JobGraph", jobGraph);
try {
JobClient client = null;
try {
client = this.executor.getJobClient(jobGraph); }
catch(Exception e) {
System.err.println("here");
}
client.submitJobAndWait();
}
catch(Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Job execution failed!");
}
}
protected JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.broadcastConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args){
WordCountLocal wC = new WordCountLocal();
try {
wC.startCluster();
wC.runJob();
wC.stopCluster();
} catch (Exception e) {}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册