提交 3e2337e4 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] branch added for new api

上级 46990c82
......@@ -79,9 +79,7 @@ public class JobGraphBuilder {
numberOfOutputChannels = new HashMap<String, List<Integer>>();
maxParallelismVertexName = "";
maxParallelism = 0;
if (log.isDebugEnabled()) {
log.debug("JobGraph created");
}
log.debug("JobGraph created");
this.faultToleranceType = faultToleranceType;
}
......@@ -127,9 +125,7 @@ public class JobGraphBuilder {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableClass, parallelism, subtasksPerInstance, source);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
}
log.debug("SOURCE: " + sourceName);
}
/**
......@@ -164,9 +160,7 @@ public class JobGraphBuilder {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableObject, parallelism, subtasksPerInstance, source);
if (log.isDebugEnabled()) {
log.debug("SOURCE: " + sourceName);
}
log.debug("SOURCE: " + sourceName);
}
/**
......@@ -198,9 +192,7 @@ public class JobGraphBuilder {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, InvokableClass, parallelism, subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
log.debug("TASK: " + taskName);
}
/**
......@@ -232,9 +224,7 @@ public class JobGraphBuilder {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, TaskInvokableObject, parallelism, subtasksPerInstance, task);
if (log.isDebugEnabled()) {
log.debug("TASK: " + taskName);
}
log.debug("TASK: " + taskName);
}
/**
......@@ -266,9 +256,7 @@ public class JobGraphBuilder {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableClass, parallelism, subtasksPerInstance, sink);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
}
log.debug("SINK: " + sinkName);
}
/**
......@@ -300,9 +288,7 @@ public class JobGraphBuilder {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableObject, parallelism, subtasksPerInstance, sink);
if (log.isDebugEnabled()) {
log.debug("SINK: " + sinkName);
}
log.debug("SINK: " + sinkName);
}
/**
......@@ -421,15 +407,11 @@ public class JobGraphBuilder {
config.setClass(
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
}
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName()
+ " : " + upStreamComponentName + " -> " + downStreamComponentName, e);
}
log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
......@@ -515,15 +497,11 @@ public class JobGraphBuilder {
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
addOutputChannels(upStreamComponentName, 1);
if (log.isDebugEnabled()) {
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
+ downStreamComponentName + ", KEY: " + keyPosition);
}
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
+ downStreamComponentName + ", KEY: " + keyPosition);
} catch (JobGraphDefinitionException e) {
if (log.isErrorEnabled()) {
log.error("Cannot connect components by field: " + upStreamComponentName + " to "
+ downStreamComponentName, e);
}
log.error("Cannot connect components by field: " + upStreamComponentName + " to "
+ downStreamComponentName, e);
}
}
......
......@@ -19,6 +19,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ConcurrentModificationException;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
......@@ -27,12 +28,8 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.MutableRecordReader;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.io.UnionRecordReader;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
......@@ -113,34 +110,20 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration)
throws StreamComponentException {
public void setConfigInputs(T taskBase, Configuration taskConfiguration,
List<StreamRecordReader<StreamRecord>> inputs) throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
if (numberOfInputs < 2) {
if (taskBase instanceof StreamTask) {
return new RecordReader<StreamRecord>((StreamTask) taskBase, StreamRecord.class);
inputs.add(new StreamRecordReader<StreamRecord>((StreamTask) taskBase,
StreamRecord.class));
} else if (taskBase instanceof StreamSink) {
return new RecordReader<StreamRecord>((StreamSink) taskBase, StreamRecord.class);
inputs.add(new StreamRecordReader<StreamRecord>((StreamSink) taskBase,
StreamRecord.class));
} else {
throw new StreamComponentException("Nonsupported object passed to setConfigInputs");
}
} else {
@SuppressWarnings("unchecked")
MutableRecordReader<StreamRecord>[] recordReaders = (MutableRecordReader<StreamRecord>[]) new MutableRecordReader<?>[numberOfInputs];
for (int i = 0; i < numberOfInputs; i++) {
if (taskBase instanceof StreamTask) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamTask) taskBase);
} else if (taskBase instanceof StreamSink) {
recordReaders[i] = new MutableRecordReader<StreamRecord>((StreamSink) taskBase);
} else {
throw new StreamComponentException(
"Nonsupported object passed to setConfigInputs");
}
}
return new UnionRecordReader<StreamRecord>(recordReaders, StreamRecord.class);
}
}
......@@ -178,20 +161,14 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
userFunctionSerialized));
userFunction = (UserSinkInvokable) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: "
+ userFunctionClass.getSimpleName());
}
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: "
+ userFunctionClass.getSimpleName());
}
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
......@@ -217,30 +194,19 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamInvokableComponent) ois.readObject();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer,
faultToleranceType);
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: "
+ userFunctionClass.getSimpleName());
}
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
} else {
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer,
faultToleranceType);
userFunction.declareOutputs(outputs, instanceID, name, recordBuffer, faultToleranceType);
} catch (InstantiationException e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: "
+ userFunctionClass.getSimpleName());
}
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
log.error("Cannot use user function: " + userFunctionClass.getSimpleName());
}
}
......@@ -248,18 +214,16 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
// TODO find a better solution for this
public void threadSafePublish(AbstractTaskEvent event, AbstractRecordReader inputs)
public void threadSafePublish(AbstractTaskEvent event, StreamRecordReader<StreamRecord> input)
throws InterruptedException, IOException {
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
inputs.publishEvent(event);
input.publishEvent(event);
concurrentModificationOccured = true;
} catch (ConcurrentModificationException exeption) {
if (log.isTraceEnabled()) {
log.trace("Waiting to publish " + event.getClass());
}
log.trace("Waiting to publish " + event.getClass());
}
}
}
......@@ -279,62 +243,55 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
} else {
partitioners.add(partitioner.newInstance());
}
if (log.isTraceEnabled()) {
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
}
log.trace("Partitioner set: " + partitioner.getSimpleName() + " with " + nrOutput
+ " outputs");
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Error while setting partitioner: " + partitioner.getSimpleName()
+ " with " + nrOutput + " outputs", e);
}
log.error("Error while setting partitioner: " + partitioner.getSimpleName() + " with "
+ nrOutput + " outputs", e);
}
}
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
AbstractRecordReader inputs, String name) throws Exception {
if (inputs instanceof UnionRecordReader) {
@SuppressWarnings("unchecked")
UnionRecordReader<StreamRecord> recordReader = (UnionRecordReader<StreamRecord>) inputs;
while (recordReader.hasNext()) {
StreamRecord record = recordReader.next();
invoker.call(name, userFunction, recordReader, record);
}
} else if (inputs instanceof RecordReader) {
@SuppressWarnings("unchecked")
RecordReader<StreamRecord> recordReader = (RecordReader<StreamRecord>) inputs;
while (recordReader.hasNext()) {
StreamRecord record = recordReader.next();
invoker.call(name, userFunction, recordReader, record);
public void invokeRecords(RecordInvoker invoker, RecordInvokable userFunction,
List<StreamRecordReader<StreamRecord>> inputs, String name) throws Exception {
List<StreamRecordReader<StreamRecord>> closedInputs = new LinkedList<StreamRecordReader<StreamRecord>>();
boolean hasInput = true;
while (hasInput) {
hasInput = false;
for (StreamRecordReader<StreamRecord> input : inputs) {
if (input.hasNext()) {
hasInput = true;
invoker.call(name, userFunction, input);
} else if (input.isInputClosed()) {
closedInputs.add(input);
}
}
inputs.removeAll(closedInputs);
}
}
public static interface RecordInvoker {
void call(String name, RecordInvokable userFunction, AbstractRecordReader inputs,
StreamRecord record) throws Exception;
void call(String name, RecordInvokable userFunction, StreamRecordReader<StreamRecord> input)
throws Exception;
}
public class InvokerWithFaultTolerance implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction, AbstractRecordReader inputs,
StreamRecord record) throws Exception {
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
UID id = record.getId();
userFunction.invoke(record);
threadSafePublish(new AckEvent(id), inputs);
if (log.isDebugEnabled()) {
log.debug("ACK: " + id + " -- " + name);
}
threadSafePublish(new AckEvent(id), input);
log.debug("ACK: " + id + " -- " + name);
}
}
public static class Invoker implements RecordInvoker {
@Override
public void call(String name, RecordInvokable userFunction, AbstractRecordReader inputs,
StreamRecord record) throws Exception {
public void call(String name, RecordInvokable userFunction,
StreamRecordReader<StreamRecord> input) throws Exception {
StreamRecord record = input.next();
userFunction.invoke(record);
}
}
......
......@@ -67,18 +67,15 @@ public abstract class StreamInvokableComponent implements Serializable {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
output.flush();
if (log.isInfoEnabled()) {
log.info("EMITTED: " + record.getId() + " -- " + name);
}
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
if (useFaultTolerance) {
emittedRecords.failRecord(record.getId());
}
if (log.isWarnEnabled()) {
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
}
......@@ -91,9 +88,7 @@ public abstract class StreamInvokableComponent implements Serializable {
try {
outputs.get(outputChannel).emit(record);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name);
}
log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name);
}
}
......
......@@ -15,21 +15,24 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamComponentHelper.RecordInvoker;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
public class StreamSink extends AbstractOutputTask {
private static final Log log = LogFactory.getLog(StreamSink.class);
private AbstractRecordReader inputs;
private List<StreamRecordReader<StreamRecord>> inputs;
private UserSinkInvokable userFunction;
private StreamComponentHelper<StreamSink> streamSinkHelper;
private String name;
......@@ -37,6 +40,7 @@ public class StreamSink extends AbstractOutputTask {
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<StreamRecordReader<StreamRecord>>();
userFunction = null;
streamSinkHelper = new StreamComponentHelper<StreamSink>();
}
......@@ -47,29 +51,22 @@ public class StreamSink extends AbstractOutputTask {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
inputs = streamSinkHelper.getConfigInputs(this, taskConfiguration);
streamSinkHelper.setConfigInputs(this, taskConfiguration, inputs);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot register inputs", e);
}
log.error("Cannot register inputs", e);
}
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration
.getInteger("faultToleranceType", 0));
FaultToleranceType faultToleranceType = FaultToleranceType.from(taskConfiguration.getInteger("faultToleranceType", 0));
invoker = streamSinkHelper.getRecordInvoker(faultToleranceType);
userFunction = streamSinkHelper.getUserFunction(taskConfiguration);
}
@Override
public void invoke() throws Exception {
if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoked");
}
log.debug("SINK " + name + " invoked");
streamSinkHelper.invokeRecords(invoker, userFunction, inputs, name);
System.out.println("Result: " + userFunction.getResult());
if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoke finished");
}
System.out.println("Result: "+userFunction.getResult());
log.debug("SINK " + name + " invoke finished");
}
}
......@@ -73,9 +73,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
try {
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
} catch (StreamComponentException e) {
if (log.isErrorEnabled()) {
log.error("Cannot register outputs", e);
}
log.error("Cannot register outputs", e);
}
int[] numberOfOutputChannels = new int[outputs.size()];
......@@ -83,8 +81,8 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType, taskConfiguration,
outputs, sourceInstanceID, name, numberOfOutputChannels);
streamSourceHelper.setFaultTolerance(recordBuffer, faultToleranceType,
taskConfiguration, outputs, sourceInstanceID, name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(taskConfiguration,
outputs, sourceInstanceID, name, recordBuffer);
......@@ -94,9 +92,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
@Override
public void invoke() throws Exception {
if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
}
log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
userFunction.invoke();
// TODO print to file
System.out.println(userFunction.getResult());
......
......@@ -22,7 +22,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.AbstractRecordReader;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
......@@ -36,7 +35,7 @@ public class StreamTask extends AbstractTask {
private static final Log log = LogFactory.getLog(StreamTask.class);
private AbstractRecordReader inputs;
private List<StreamRecordReader<StreamRecord>> inputs;
private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners;
private UserTaskInvokable userFunction;
......@@ -52,6 +51,7 @@ public class StreamTask extends AbstractTask {
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<StreamRecordReader<StreamRecord>>();
outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
......@@ -66,12 +66,10 @@ public class StreamTask extends AbstractTask {
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
try {
inputs = streamTaskHelper.getConfigInputs(this, taskConfiguration);
streamTaskHelper.setConfigInputs(this, taskConfiguration, inputs);
streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
} catch (StreamComponentException e) {
if (log.isErrorEnabled()) {
log.error("Cannot register inputs/outputs for " + getClass().getSimpleName(), e);
}
log.error("Cannot register inputs/outputs for " + getClass().getSimpleName(), e);
}
int[] numberOfOutputChannels = new int[outputs.size()];
......@@ -91,15 +89,11 @@ public class StreamTask extends AbstractTask {
@Override
public void invoke() throws Exception {
if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
}
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(invoker, userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
}
}
\ No newline at end of file
......@@ -15,15 +15,17 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
public class WordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
......@@ -45,7 +47,7 @@ public class WordCountCounter extends UserTaskInvokable {
outRecord.setInteger(1, count);
emit(outRecord);
performanceCounter.count();
// performanceCounter.count();
}
......
......@@ -48,7 +48,7 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
outRecord.setString(0, word);
System.out.println("word=" + word);
emit(outRecord);
performanceCounter.count();
//performanceCounter.count();
}
}
}
......
......@@ -64,10 +64,8 @@ public class AckEventListener implements EventListener {
Long nt = System.nanoTime();
recordBuffer.ackRecord(ackEvent.getRecordId(), output);
if (log.isDebugEnabled()) {
log.debug("ACK PROCESSED: " + output + " " + ackEvent.getRecordId()
+ " exec. time (ns): " + (System.nanoTime() - nt));
}
log.debug("ACK PROCESSED: " + output + " " + ackEvent.getRecordId()
+ " exec. time (ns): " + (System.nanoTime() - nt));
}
}
}
......@@ -62,9 +62,7 @@ public class FailEventListener implements EventListener {
int failCID = recordId.getChannelId();
if (failCID == taskInstanceID) {
recordBuffer.failRecord(recordId, output);
if (log.isWarnEnabled()) {
log.warn("FAIL RECIEVED: " + output + " " + recordId);
}
log.warn("FAIL RECIEVED: "+output +" "+ recordId);
}
}
......
......@@ -70,9 +70,7 @@ public abstract class FaultToleranceBuffer {
addTimestamp(id);
addToAckCounter(id);
if (log.isTraceEnabled()) {
log.trace("Record added to buffer: " + id);
}
log.trace("Record added to buffer: " + id);
}
public void add(StreamRecord streamRecord, int channel) {
......@@ -85,9 +83,7 @@ public abstract class FaultToleranceBuffer {
addToAckCounter(id, channel);
if (log.isTraceEnabled()) {
log.trace("Record added to buffer: " + id);
}
log.trace("Record added to buffer: " + id);
}
protected abstract void addToAckCounter(UID id);
......@@ -135,14 +131,10 @@ public abstract class FaultToleranceBuffer {
recordsByTime.get(recordTimestamps.remove(uid)).remove(uid);
if (log.isTraceEnabled()) {
log.trace("Record removed from buffer: " + uid);
}
log.trace("Record removed from buffer: " + uid);
return recordBuffer.remove(uid);
} else {
if (log.isWarnEnabled()) {
log.warn("Record ALREADY REMOVED from buffer: " + uid);
}
log.warn("Record ALREADY REMOVED from buffer: " + uid);
return null;
}
......@@ -151,9 +143,7 @@ public abstract class FaultToleranceBuffer {
// TODO:test this
public List<UID> timeoutRecords(Long currentTime) {
if (timeOfLastUpdate + timeout < currentTime) {
if (log.isTraceEnabled()) {
log.trace("Updating record buffer");
}
log.trace("Updating record buffer");
List<UID> timedOutRecords = new LinkedList<UID>();
Map<Long, Set<UID>> timedOut = recordsByTime.subMap(0L, currentTime - timeout);
......
......@@ -39,7 +39,7 @@ public class FaultToleranceUtil {
private final int componentID;
private int numberOfChannels;
private FaultToleranceBuffer buffer;
public FaultToleranceType type;
public PerformanceTracker tracker;
......@@ -58,22 +58,20 @@ public class FaultToleranceUtil {
*/
// TODO:update logs for channel
// acks and fails
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs,
int sourceInstanceID, int[] numberOfChannels) {
public FaultToleranceUtil(FaultToleranceType type, List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
this.type = type;
switch (type) {
case EXACTLY_ONCE:
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
break;
case AT_LEAST_ONCE:
case NONE:
default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
case AT_LEAST_ONCE: case NONE: default:
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
......@@ -192,14 +190,9 @@ public class FaultToleranceUtil {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(record);
if (log.isWarnEnabled()) {
log.warn("RE-EMITTED: " + record.getId());
}
log.warn("RE-EMITTED: " + record.getId());
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("RE-EMIT FAILED, avoiding record: " + record.getId());
}
log.error("RE-EMIT FAILED, avoiding record: " + record.getId());
}
}
......@@ -217,13 +210,9 @@ public class FaultToleranceUtil {
{
try {
outputs.get(outputChannel).emit(record);
if (log.isWarnEnabled()) {
log.warn("RE-EMITTED: " + record.getId() + " " + outputChannel);
}
log.warn("RE-EMITTED: " + record.getId() + " " + outputChannel);
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("RE-EMIT FAILED, avoiding record: " + record.getId());
}
log.error("RE-EMIT FAILED, avoiding record: " + record.getId());
}
}
......
......@@ -91,31 +91,6 @@ public class StreamComponentTest {
}
}
public static class MyOtherTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
String out;
public MyOtherTask() {
}
public MyOtherTask(String string) {
out = string;
}
@Override
public void invoke(StreamRecord record) throws Exception {
Integer i = record.getInteger(0);
emit(new StreamRecord(new Tuple2<Integer, Integer>(-i - 1, -i - 2)));
}
@Override
public String getResult() {
return out;
}
}
public static class MySink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
......@@ -145,13 +120,10 @@ public class StreamComponentTest {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("MySource", new MySource("source"));
graphBuilder.setTask("MyTask", new MyTask("task"), 1, 1);
graphBuilder.setTask("MyOtherTask", new MyOtherTask("otherTask"), 1, 1);
graphBuilder.setTask("MyTask", new MyTask("task"), 2, 2);
graphBuilder.setSink("MySink", new MySink("sink"));
graphBuilder.shuffleConnect("MySource", "MyTask");
graphBuilder.shuffleConnect("MySource", "MyOtherTask");
graphBuilder.shuffleConnect("MyOtherTask", "MySink");
graphBuilder.shuffleConnect("MyTask", "MySink");
ClusterUtil.runOnMiniCluster(graphBuilder.getJobGraph());
......@@ -159,14 +131,10 @@ public class StreamComponentTest {
@Test
public void test() {
assertEquals(200, data.keySet().size());
assertEquals(100, data.keySet().size());
for (Integer k : data.keySet()) {
if (k < 0) {
assertEquals((Integer) (k - 1), data.get(k));
} else {
assertEquals((Integer) (k + 1), data.get(k));
}
assertEquals((Integer) (k + 1), data.get(k));
}
}
}
package eu.stratosphere.streaming.api.streamrecord;
import java.io.Serializable;
public abstract class MyGeneric<IN> implements Serializable {
public abstract void asd();
}
package eu.stratosphere.streaming.api.streamrecord;
public class MyGeneric2 extends MyGeneric<Integer>{
@Override
public void asd() {
// TODO Auto-generated method stub
}
}
......@@ -26,6 +26,8 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
......@@ -258,47 +260,56 @@ public class StreamRecordTest {
fail();
} catch (NoSuchFieldException e) {
}
try {
a.getBoolean(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getByte(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getCharacter(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getDouble(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getFloat(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getInteger(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getLong(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
try {
a.getShort(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
StreamRecord c = new StreamRecord(new Tuple1<Integer>(1));
try {
c.getString(0);
fail();
} catch (FieldTypeMismatchException e) {}
} catch (FieldTypeMismatchException e) {
}
}
@Test
......@@ -490,4 +501,24 @@ public class StreamRecordTest {
System.out.println(Arrays.toString(ols.estimateRegressionParameters()));
}
@Test
public void typeExtractTest() throws IOException, ClassNotFoundException {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(buff);
MyGeneric<?> g = new MyGeneric2();
out.writeObject(g);
ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(buff.toByteArray()));
MyGeneric<?> f = (MyGeneric<?>) in.readObject();
TypeInformation<?> ti = TypeExtractor.createTypeInfo(MyGeneric.class, f.getClass(), 0,
null, null);
System.out.println("Type info: " + ti);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册