提交 63a2dfdc 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] StreamTask getClass TODO resolved

上级 cc0b496f
......@@ -29,7 +29,6 @@ public class StreamSink extends AbstractOutputTask {
private List<RecordReader<Record>> inputs;
private UserSinkInvokable userFunction;
private int numberOfInputs;
public StreamSink() {
......
......@@ -28,6 +28,7 @@ import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
......@@ -38,12 +39,10 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserSourceInvokable userFunction;
private int numberOfOutputs;
private static int numSources = 0;
private String sourceInstanceID;
private int numberOfOutputs;
private Map<String, StreamRecord> recordBuffer;
public StreamSource() {
......@@ -54,9 +53,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
numberOfOutputs = 0;
numSources++;
sourceInstanceID = Integer.toString(numSources);
recordBuffer = new TreeMap<String, StreamRecord>();
}
@Override
......@@ -109,9 +106,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
ChannelSelector.class);
try {
// TODO: Fix class comparison
if (partitioner.getName().equals(
"eu.stratosphere.streaming.partitioner.FieldsPartitioner")) {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
......
......@@ -29,6 +29,7 @@ import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
......@@ -40,13 +41,11 @@ public class StreamTask extends AbstractTask {
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserTaskInvokable userFunction;
private int numberOfInputs;
private int numberOfOutputs;
private static int numTasks = 0;
private String taskInstanceID = "";
private Map<String, StreamRecord> recordBuffer;
public StreamTask() {
......@@ -115,9 +114,7 @@ public class StreamTask extends AbstractTask {
ChannelSelector.class);
try {
// TODO: Fix class comparison
if (partitioner.getName().equals(
"eu.stratosphere.streaming.partitioner.FieldsPartitioner")) {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册