提交 bd81e2c3 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] StreamRecord copy quickfix

上级 700b38d7
......@@ -23,30 +23,32 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
/**TaskEvent for sending record acknowledgements to the input's fault tolerance buffer
/**
* TaskEvent for sending record acknowledgements to the input's fault tolerance
* buffer
*
*
*
*/
public class AckEvent extends AbstractTaskEvent {
private static final Log log = LogFactory.getLog(AckEvent.class);
private String recordId;
/**Creates a new event to acknowledge the record with the given ID
/**
* Creates a new event to acknowledge the record with the given ID
*
* @param recordId
* ID of the record to be acknowledged
* ID of the record to be acknowledged
*/
public AckEvent(String recordId) {
setRecordId(recordId);
if (log.isDebugEnabled()) {
log.debug("Ack sent " + recordId);
}
// System.out.println("Ack sent " + recordId);
// System.out.println("---------------------");
}
@Override
......
......@@ -248,6 +248,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public StreamRecord copy() {
StreamRecord copiedRecord = new StreamRecord(this.numOfFields, this.numOfRecords);
copiedRecord.uid = this.uid;
for (Value[] record : recordBatch) {
copiedRecord.recordBatch.add(record);
}
......
......@@ -15,6 +15,8 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokable;
public abstract class UserSourceInvokable extends StreamInvokable {
public void invoke() throws Exception {
......
......@@ -16,6 +16,7 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.streamcomponent.StreamInvokable;
public abstract class UserTaskInvokable extends StreamInvokable {
......
......@@ -19,7 +19,6 @@ import eu.stratosphere.streaming.api.FaultToleranceBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
......
package eu.stratosphere.streaming.api.invokable;
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.List;
......@@ -29,10 +29,10 @@ public abstract class StreamInvokable {
try {
output.emit(record);
System.out.println(this.getClass().getName());
System.out.println("Emitted " + record.getId() + "-"
+ record.toString());
System.out.println("---------------------");
// System.out.println(this.getClass().getName());
// System.out.println("Emitted " + record.getId() + "-"
// + record.toString());
// System.out.println("---------------------");
} catch (Exception e) {
System.out.println("Emit error: " + e.getMessage());
......
......@@ -80,8 +80,8 @@ public class StreamSource extends AbstractInputTask<RandIS> {
public void invoke() throws Exception {
userFunction.invoke();
System.out.println(this.getClass().getName() + "-" + sourceInstanceID);
System.out.println("---------------------");
// System.out.println(this.getClass().getName() + "-" + sourceInstanceID);
// System.out.println("---------------------");
}
......
......@@ -15,6 +15,11 @@
package eu.stratosphere.streaming.test.wordcount;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
......@@ -24,6 +29,15 @@ public class WordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout(
"%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.DEBUG);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
......
......@@ -16,6 +16,7 @@
package eu.stratosphere.streaming.test.wordcount;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.junit.Assert;
......@@ -106,7 +107,7 @@ public class WordCountLocal {
protected JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......@@ -123,6 +124,7 @@ public class WordCountLocal {
public static void main(String[] args){
WordCountLocal wC = new WordCountLocal();
BasicConfigurator.configure();
try {
wC.startCluster();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册