提交 a3164ab8 编写于 作者: G ghermann 提交者: Stephan Ewen

[streaming] Replaced StreamComponentHelper with AbstractStreamComponent

上级 b44650b0
......@@ -17,7 +17,7 @@ package eu.stratosphere.streaming.api.invokable;
import java.io.Serializable;
public abstract class StreamComponent implements Serializable {
public abstract class StreamComponentInvokable implements Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -20,7 +20,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.util.Collector;
public abstract class StreamRecordInvokable<IN extends Tuple, OUT extends Tuple> extends
StreamComponent {
StreamComponentInvokable {
public abstract void invoke(StreamRecord record, Collector<OUT> collector)
throws Exception;
}
......@@ -20,7 +20,7 @@ import java.io.Serializable;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.util.Collector;
public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponent implements
public abstract class UserSourceInvokable<OUT extends Tuple> extends StreamComponentInvokable implements
Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -16,10 +16,8 @@
package eu.stratosphere.streaming.api.streamcomponent;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.List;
import org.apache.commons.logging.Log;
......@@ -35,8 +33,6 @@ import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.api.java.typeutils.runtime.TupleSerializer;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
......@@ -46,28 +42,17 @@ import eu.stratosphere.runtime.io.api.MutableRecordReader;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.SinkFunction;
import eu.stratosphere.streaming.api.StreamCollectorManager;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamComponent;
import eu.stratosphere.streaming.api.invokable.StreamComponentInvokable;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.ArrayStreamRecord;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.util.Collector;
public final class StreamComponentHelper {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
private static int numComponents = 0;
public abstract class AbstractStreamComponent extends AbstractInvokable {
private final Log log = LogFactory.getLog(AbstractStreamComponent.class);
private TupleTypeInfo<Tuple> inTupleTypeInfo = null;
private TupleSerializer<Tuple> inTupleSerializer = null;
......@@ -77,7 +62,6 @@ public final class StreamComponentHelper {
private TupleSerializer<Tuple> outTupleSerializer = null;
private SerializationDelegate<Tuple> outSerializationDelegate = null;
public Collector<Tuple> collector;
private List<Integer> batchSizesNotPartitioned = new ArrayList<Integer>();
private List<Integer> batchSizesPartitioned = new ArrayList<Integer>();
private List<Integer> numOfOutputsPartitioned = new ArrayList<Integer>();
......@@ -86,49 +70,34 @@ public final class StreamComponentHelper {
private List<RecordWriter<StreamRecord>> outputsNotPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
private List<RecordWriter<StreamRecord>> outputsPartitioned = new ArrayList<RecordWriter<StreamRecord>>();
public static int newComponent() {
protected Configuration configuration;
protected Collector<Tuple> collector;
protected int instanceID;
protected String name;
private static int numComponents = 0;
protected static int newComponent() {
numComponents++;
return numComponents;
}
public void setAckListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
EventListener[] ackListeners = new EventListener[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
ackListeners[i] = new AckEventListener(sourceInstanceID, recordBuffer, i);
outputs.get(i).subscribeToEvent(ackListeners[i], AckEvent.class);
}
}
public void setFailListener(FaultToleranceUtil recordBuffer, int sourceInstanceID,
List<RecordWriter<StreamRecord>> outputs) {
EventListener[] failListeners = new EventListener[outputs.size()];
for (int i = 0; i < outputs.size(); i++) {
failListeners[i] = new FailEventListener(sourceInstanceID, recordBuffer, i);
outputs.get(i).subscribeToEvent(failListeners[i], FailEvent.class);
}
protected void initialize() {
configuration = getTaskConfiguration();
name = configuration.getString("componentName", "MISSING_COMPONENT_NAME");
}
public Collector<Tuple> setCollector(Configuration taskConfiguration, int id,
List<RecordWriter<StreamRecord>> outputs) {
long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000);
protected Collector<Tuple> setCollector(List<RecordWriter<StreamRecord>> outputs) {
long batchTimeout = configuration.getLong("batchTimeout", 1000);
collector = new StreamCollectorManager<Tuple>(batchSizesNotPartitioned,
batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout, id,
outSerializationDelegate, outputsPartitioned, outputsNotPartitioned);
batchSizesPartitioned, numOfOutputsPartitioned, keyPosition, batchTimeout,
instanceID, outSerializationDelegate, outputsPartitioned, outputsNotPartitioned);
return collector;
}
public void setSerializers(Configuration taskConfiguration) {
byte[] operatorBytes = taskConfiguration.getBytes("operator", null);
String operatorName = taskConfiguration.getString("operatorName", "");
protected void setSerializers() {
byte[] operatorBytes = configuration.getBytes("operator", null);
String operatorName = configuration.getString("operatorName", "");
Object function = null;
try {
......@@ -185,7 +154,7 @@ public final class StreamComponentHelper {
outSerializationDelegate = new SerializationDelegate<Tuple>(outTupleSerializer);
}
public void setSinkSerializer() {
protected void setSinkSerializer() {
if (outSerializationDelegate != null) {
inTupleTypeInfo = outTupleTypeInfo;
......@@ -194,13 +163,13 @@ public final class StreamComponentHelper {
}
}
public AbstractRecordReader getConfigInputs(AbstractInvokable taskBase,
Configuration taskConfiguration) throws StreamComponentException {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
protected AbstractRecordReader getConfigInputs()
throws StreamComponentException {
int numberOfInputs = configuration.getInteger("numberOfInputs", 0);
if (numberOfInputs < 2) {
return new StreamRecordReader(taskBase, ArrayStreamRecord.class,
return new StreamRecordReader(this, ArrayStreamRecord.class,
inDeserializationDelegate, inTupleSerializer);
} else {
......@@ -209,7 +178,7 @@ public final class StreamComponentHelper {
for (int i = 0; i < numberOfInputs; i++) {
recordReaders[i] = new MutableRecordReader<StreamRecord>(taskBase);
recordReaders[i] = new MutableRecordReader<StreamRecord>(this);
}
return new UnionStreamRecordReader(recordReaders, ArrayStreamRecord.class,
......@@ -217,17 +186,16 @@ public final class StreamComponentHelper {
}
}
public void setConfigOutputs(AbstractInvokable taskBase, Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs,
protected void setConfigOutputs(List<RecordWriter<StreamRecord>> outputs,
List<ChannelSelector<StreamRecord>> partitioners) throws StreamComponentException {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
int numberOfOutputs = configuration.getInteger("numberOfOutputs", 0);
for (int i = 0; i < numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i, partitioners);
setPartitioner(i, partitioners);
ChannelSelector<StreamRecord> outputPartitioner = partitioners.get(i);
outputs.add(new RecordWriter<StreamRecord>(taskBase, outputPartitioner));
outputs.add(new RecordWriter<StreamRecord>(this, outputPartitioner));
if (outputsPartitioned.size() < batchSizesPartitioned.size()) {
outputsPartitioned.add(outputs.get(i));
......@@ -237,93 +205,22 @@ public final class StreamComponentHelper {
}
}
/**
* Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @param config
* Configuration object
* @return The StreamComponent object
*/
private StreamComponent getInvokable(Class<? extends StreamComponent> userFunctionClass,
Configuration config) {
StreamComponent userFunction = null;
byte[] userFunctionSerialized = config.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamComponent) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
return userFunction;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public UserSinkInvokable<Tuple> getSinkInvokable(Configuration config) {
Class<? extends UserSinkInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
return (UserSinkInvokable<Tuple>) getInvokable(userFunctionClass, config);
}
// TODO consider logging stack trace!
@SuppressWarnings({ "rawtypes", "unchecked" })
public UserTaskInvokable<Tuple, Tuple> getTaskInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = config.getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class);
return (UserTaskInvokable<Tuple, Tuple>) getInvokable(userFunctionClass, config);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public UserSourceInvokable<Tuple> getSourceInvokable(Configuration config) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = config.getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class);
return (UserSourceInvokable<Tuple>) getInvokable(userFunctionClass, config);
}
// TODO find a better solution for this
public void threadSafePublish(AbstractTaskEvent event, AbstractRecordReader inputs)
throws InterruptedException, IOException {
boolean concurrentModificationOccured = false;
while (!concurrentModificationOccured) {
try {
inputs.publishEvent(event);
concurrentModificationOccured = true;
} catch (ConcurrentModificationException exeption) {
if (log.isTraceEnabled()) {
log.trace("Waiting to publish " + event.getClass());
}
}
}
}
private void setPartitioner(Configuration config, int numberOfOutputs,
private void setPartitioner(int numberOfOutputs,
List<ChannelSelector<StreamRecord>> partitioners) {
Class<? extends ChannelSelector<StreamRecord>> partitioner = config.getClass(
Class<? extends ChannelSelector<StreamRecord>> partitioner = configuration.getClass(
"partitionerClass_" + numberOfOutputs, DefaultPartitioner.class,
ChannelSelector.class);
Integer batchSize = config.getInteger("batchSize_" + numberOfOutputs, 1);
Integer batchSize = configuration.getInteger("batchSize_" + numberOfOutputs, 1);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
batchSizesPartitioned.add(batchSize);
numOfOutputsPartitioned.add(config
numOfOutputsPartitioned.add(configuration
.getInteger("numOfOutputs_" + numberOfOutputs, -1));
// TODO:force one partitioning field
keyPosition = config.getInteger("partitionerIntParam_" + numberOfOutputs, 1);
keyPosition = configuration.getInteger("partitionerIntParam_" + numberOfOutputs, 1);
partitioners.add(partitioner.getConstructor(int.class).newInstance(keyPosition));
......@@ -343,7 +240,7 @@ public final class StreamComponentHelper {
}
}
public void invokeRecords(StreamRecordInvokable<Tuple, Tuple> userFunction,
protected void invokeRecords(StreamRecordInvokable<Tuple, Tuple> userFunction,
AbstractRecordReader inputs) throws Exception {
if (inputs instanceof UnionStreamRecordReader) {
UnionStreamRecordReader recordReader = (UnionStreamRecordReader) inputs;
......@@ -361,5 +258,77 @@ public final class StreamComponentHelper {
}
}
}
/**
* Reads and creates a StreamComponent from the config.
*
* @param userFunctionClass
* Class of the invokable function
* @param configuration
* Configuration object
* @return The StreamComponent object
*/
protected StreamComponentInvokable getInvokable(Class<? extends StreamComponentInvokable> userFunctionClass) {
StreamComponentInvokable userFunction = null;
byte[] userFunctionSerialized = configuration.getBytes("serializedudf", null);
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(
userFunctionSerialized));
userFunction = (StreamComponentInvokable) ois.readObject();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot instanciate user function: " + userFunctionClass.getSimpleName());
}
}
}
\ No newline at end of file
return userFunction;
}
protected abstract void setInvokable();
// protected void threadSafePublish(AbstractTaskEvent event,
// AbstractRecordReader inputs)
// throws InterruptedException, IOException {
//
// boolean concurrentModificationOccured = false;
// while (!concurrentModificationOccured) {
// try {
// inputs.publishEvent(event);
// concurrentModificationOccured = true;
// } catch (ConcurrentModificationException exeption) {
// if (log.isTraceEnabled()) {
// log.trace("Waiting to publish " + event.getClass());
// }
// }
// }
// }
//
// protected void setAckListener(FaultToleranceUtil recordBuffer, int
// sourceInstanceID,
// List<RecordWriter<StreamRecord>> outputs) {
//
// EventListener[] ackListeners = new EventListener[outputs.size()];
//
// for (int i = 0; i < outputs.size(); i++) {
// ackListeners[i] = new AckEventListener(sourceInstanceID, recordBuffer,
// i);
// outputs.get(i).subscribeToEvent(ackListeners[i], AckEvent.class);
// }
//
// }
//
// protected void setFailListener(FaultToleranceUtil recordBuffer, int
// sourceInstanceID,
// List<RecordWriter<StreamRecord>> outputs) {
//
// EventListener[] failListeners = new EventListener[outputs.size()];
//
// for (int i = 0; i < outputs.size(); i++) {
// failListeners[i] = new FailEventListener(sourceInstanceID, recordBuffer,
// i);
// outputs.get(i).subscribeToEvent(failListeners[i], FailEvent.class);
// }
// }
}
......@@ -19,35 +19,31 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
public class StreamSink extends AbstractInvokable {
public class StreamSink extends AbstractStreamComponent {
private static final Log log = LogFactory.getLog(StreamSink.class);
private AbstractRecordReader inputs;
private UserSinkInvokable<Tuple> userFunction;
private StreamComponentHelper streamSinkHelper;
private String name;
private StreamRecordInvokable<Tuple, Tuple> userFunction;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
userFunction = null;
streamSinkHelper = new StreamComponentHelper();
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
initialize();
try {
streamSinkHelper.setSerializers(taskConfiguration);
streamSinkHelper.setSinkSerializer();
inputs = streamSinkHelper.getConfigInputs(this, taskConfiguration);
setSerializers();
setSinkSerializer();
inputs = getConfigInputs();
} catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Cannot register inputs", e);
......@@ -58,7 +54,15 @@ public class StreamSink extends AbstractInvokable {
// FaultToleranceType.from(taskConfiguration
// .getInteger("faultToleranceType", 0));
userFunction = streamSinkHelper.getSinkInvokable(taskConfiguration);
setInvokable();
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setInvokable() {
Class<? extends UserSinkInvokable> userFunctionClass = configuration.getClass(
"userfunction", DefaultSinkInvokable.class, UserSinkInvokable.class);
userFunction = (UserSinkInvokable<Tuple>) getInvokable(userFunctionClass);
}
@Override
......@@ -67,9 +71,11 @@ public class StreamSink extends AbstractInvokable {
log.debug("SINK " + name + " invoked");
}
streamSinkHelper.invokeRecords(userFunction, inputs);
invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) {
log.debug("SINK " + name + " invoke finished");
}
}
}
......@@ -22,14 +22,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class StreamSource extends AbstractInvokable {
public class StreamSource extends AbstractStreamComponent {
private static final Log log = LogFactory.getLog(StreamSource.class);
......@@ -37,57 +36,70 @@ public class StreamSource extends AbstractInvokable {
private List<ChannelSelector<StreamRecord>> partitioners;
private UserSourceInvokable<Tuple> userFunction;
private static int numSources;
private int sourceInstanceID;
private String name;
private int[] numberOfOutputChannels;
// private FaultToleranceUtil recordBuffer;
// private FaultToleranceType faultToleranceType;
StreamComponentHelper streamSourceHelper;
public StreamSource() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
streamSourceHelper = new StreamComponentHelper();
numSources = StreamComponentHelper.newComponent();
sourceInstanceID = numSources;
numSources = newComponent();
instanceID = numSources;
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
initialize();
try {
streamSourceHelper.setSerializers(taskConfiguration);
streamSourceHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
streamSourceHelper.setCollector(taskConfiguration, sourceInstanceID, outputs);
setSerializers();
setConfigOutputs(outputs, partitioners);
setCollector(outputs);
} catch (StreamComponentException e) {
if (log.isErrorEnabled()) {
log.error("Cannot register outputs", e);
}
}
int[] numberOfOutputChannels = new int[outputs.size()];
numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
}
userFunction = (UserSourceInvokable<Tuple>) streamSourceHelper
.getSourceInvokable(taskConfiguration);
// streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
// streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID, outputs);
setInvokable();
// streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID,
// outputs);
// streamSourceHelper.setFailListener(recordBuffer, sourceInstanceID,
// outputs);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserSourceInvokable> userFunctionClass = configuration.getClass(
"userfunction", DefaultSourceInvokable.class, UserSourceInvokable.class);
userFunction = (UserSourceInvokable<Tuple>) getInvokable(userFunctionClass);
}
@Override
public void invoke() throws Exception {
if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
log.debug("SOURCE " + name + " invoked with instance id " + instanceID);
}
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
userFunction.invoke(streamSourceHelper.collector);
userFunction.invoke(collector);
if (log.isDebugEnabled()) {
log.debug("SOURCE " + name + " invoke finished with instance id " + instanceID);
}
}
}
......@@ -22,83 +22,87 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.template.AbstractInvokable;
import eu.stratosphere.runtime.io.api.AbstractRecordReader;
import eu.stratosphere.runtime.io.api.ChannelSelector;
import eu.stratosphere.runtime.io.api.RecordWriter;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamRecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class StreamTask extends AbstractInvokable {
public class StreamTask extends AbstractStreamComponent {
private static final Log log = LogFactory.getLog(StreamTask.class);
private AbstractRecordReader inputs;
private List<RecordWriter<StreamRecord>> outputs;
private List<ChannelSelector<StreamRecord>> partitioners;
private UserTaskInvokable<Tuple, Tuple> userFunction;
private StreamRecordInvokable<Tuple, Tuple> userFunction;
private int[] numberOfOutputChannels;
private static int numTasks;
private int taskInstanceID;
private String name;
private StreamComponentHelper streamTaskHelper;
// private FaultToleranceType faultToleranceType;
Configuration taskConfiguration;
// private FaultToleranceUtil recordBuffer;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<StreamRecord>>();
partitioners = new LinkedList<ChannelSelector<StreamRecord>>();
userFunction = null;
numTasks = StreamComponentHelper.newComponent();
taskInstanceID = numTasks;
streamTaskHelper = new StreamComponentHelper();
numTasks = newComponent();
instanceID = numTasks;
}
@Override
public void registerInputOutput() {
taskConfiguration = getTaskConfiguration();
name = taskConfiguration.getString("componentName", "MISSING_COMPONENT_NAME");
initialize();
try {
streamTaskHelper.setSerializers(taskConfiguration);
inputs = streamTaskHelper.getConfigInputs(this, taskConfiguration);
streamTaskHelper.setConfigOutputs(this, taskConfiguration, outputs, partitioners);
streamTaskHelper.setCollector(taskConfiguration, taskInstanceID, outputs);
setSerializers();
inputs = getConfigInputs();
setConfigOutputs(outputs, partitioners);
setCollector(outputs);
} catch (StreamComponentException e) {
if (log.isErrorEnabled()) {
log.error("Cannot register inputs/outputs for " + getClass().getSimpleName(), e);
}
}
int[] numberOfOutputChannels = new int[outputs.size()];
numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
numberOfOutputChannels[i] = configuration.getInteger("channels_" + i, 0);
}
userFunction = (UserTaskInvokable<Tuple, Tuple>) streamTaskHelper
.getTaskInvokable(taskConfiguration);
setInvokable();
// streamTaskHelper.setAckListener(recordBuffer, taskInstanceID,
// outputs);
// streamTaskHelper.setFailListener(recordBuffer, taskInstanceID,
// outputs);
}
// streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
// streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
// TODO consider logging stack trace!
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
protected void setInvokable() {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends UserTaskInvokable> userFunctionClass = configuration.getClass(
"userfunction", DefaultTaskInvokable.class, UserTaskInvokable.class);
userFunction = (UserTaskInvokable<Tuple, Tuple>) getInvokable(userFunctionClass);
}
@Override
public void invoke() throws Exception {
if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
log.debug("TASK " + name + " invoked with instance id " + instanceID);
}
for (RecordWriter<StreamRecord> output : outputs) {
output.initializeSerializers();
}
streamTaskHelper.invokeRecords(userFunction, inputs);
invokeRecords(userFunction, inputs);
if (log.isDebugEnabled()) {
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
log.debug("TASK " + name + " invoke finished with instance id " + instanceID);
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册