提交 d4d8c40c 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] updated fields partitioning api

上级 5030bec4
...@@ -236,8 +236,7 @@ public class JobGraphBuilder { ...@@ -236,8 +236,7 @@ public class JobGraphBuilder {
* Class of the key Value stored in the record * Class of the key Value stored in the record
*/ */
//TODO: remove unused third parameter //TODO: remove unused third parameter
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName, int keyPosition, public void fieldsConnect(String upStreamComponentName, String downStreamComponentName, int keyPosition) {
Class<? extends Key> keyClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName); AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName); AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
...@@ -250,9 +249,6 @@ public class JobGraphBuilder { ...@@ -250,9 +249,6 @@ public class JobGraphBuilder {
config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1), config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
FieldsPartitioner.class); FieldsPartitioner.class);
config.setClass("partitionerClassParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
keyClass);
config.setInteger("partitionerIntParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1), config.setInteger("partitionerIntParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
keyPosition); keyPosition);
......
...@@ -42,8 +42,6 @@ import eu.stratosphere.streaming.faulttolerance.FailEventListener; ...@@ -42,8 +42,6 @@ import eu.stratosphere.streaming.faulttolerance.FailEventListener;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.StringValue;
public final class StreamComponentHelper<T extends AbstractInvokable> { public final class StreamComponentHelper<T extends AbstractInvokable> {
private static final Log log = LogFactory.getLog(StreamComponentHelper.class); private static final Log log = LogFactory.getLog(StreamComponentHelper.class);
...@@ -171,11 +169,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> { ...@@ -171,11 +169,10 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
if (partitioner.equals(FieldsPartitioner.class)) { if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration int keyPosition = taskConfiguration
.getInteger("partitionerIntParam_" + nrOutput, 1); .getInteger("partitionerIntParam_" + nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass("partitionerClassParam_"
+ nrOutput, StringValue.class, Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class).newInstance(
keyPosition, keyClass)); partitioners.add(partitioner.getConstructor(int.class).newInstance(
keyPosition));
} else { } else {
partitioners.add(partitioner.newInstance()); partitioners.add(partitioner.newInstance());
......
...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration; ...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.StringValue;
public class BatchWordCountLocal { public class BatchWordCountLocal {
...@@ -37,8 +36,7 @@ public class BatchWordCountLocal { ...@@ -37,8 +36,7 @@ public class BatchWordCountLocal {
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class); graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter"); graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0, graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0);
StringValue.class);
graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink"); graphBuilder.shuffleConnect("BatchWordCountCounter", "BatchWordCountSink");
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
......
...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration; ...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.IntValue;
public class CellInfoLocal { public class CellInfoLocal {
...@@ -36,9 +35,8 @@ public class CellInfoLocal { ...@@ -36,9 +35,8 @@ public class CellInfoLocal {
graphBuilder.setTask("cellTask", CellTask.class, 3); graphBuilder.setTask("cellTask", CellTask.class, 3);
graphBuilder.setSink("sink", CellSink.class); graphBuilder.setSink("sink", CellSink.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, IntValue.class); graphBuilder.fieldsConnect("infoSource", "cellTask", 0);
graphBuilder graphBuilder.fieldsConnect("querySource", "cellTask", 0);
.fieldsConnect("querySource", "cellTask", 0, IntValue.class);
graphBuilder.shuffleConnect("cellTask", "sink"); graphBuilder.shuffleConnect("cellTask", "sink");
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
...@@ -63,8 +61,7 @@ public class CellInfoLocal { ...@@ -63,8 +61,7 @@ public class CellInfoLocal {
exec.start(); exec.start();
Client client = new Client(new InetSocketAddress("localhost", Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
6498), configuration);
client.run(jG, true); client.run(jG, true);
...@@ -72,8 +69,8 @@ public class CellInfoLocal { ...@@ -72,8 +69,8 @@ public class CellInfoLocal {
} else if (args[0].equals("cluster")) { } else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode"); System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress( Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
"hadoop02.ilab.sztaki.hu", 6123), configuration); configuration);
client.run(jG, true); client.run(jG, true);
} }
......
...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration; ...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.StringValue;
//TODO: window operator remains unfinished. //TODO: window operator remains unfinished.
public class WindowWordCountLocal { public class WindowWordCountLocal {
...@@ -38,8 +37,7 @@ public class WindowWordCountLocal { ...@@ -38,8 +37,7 @@ public class WindowWordCountLocal {
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class); graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter"); graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0, graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0);
StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink"); graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink");
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
......
...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration; ...@@ -25,7 +25,6 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.StringValue;
public class WordCountLocal { public class WordCountLocal {
...@@ -37,7 +36,7 @@ public class WordCountLocal { ...@@ -37,7 +36,7 @@ public class WordCountLocal {
graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter"); graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class); graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
......
...@@ -27,7 +27,6 @@ import eu.stratosphere.core.fs.Path; ...@@ -27,7 +27,6 @@ import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.types.StringValue;
public class WordCountRemote { public class WordCountRemote {
...@@ -39,7 +38,7 @@ public class WordCountRemote { ...@@ -39,7 +38,7 @@ public class WordCountRemote {
graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter"); graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0, StringValue.class); graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph(); return graphBuilder.getJobGraph();
......
...@@ -17,35 +17,20 @@ package eu.stratosphere.streaming.partitioner; ...@@ -17,35 +17,20 @@ package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.Key;
//Grouping by a key //Grouping by a key
public class FieldsPartitioner implements ChannelSelector<StreamRecord> { public class FieldsPartitioner implements ChannelSelector<StreamRecord> {
private int keyPosition; private int keyPosition;
// private Class<? extends Key> keyClass;
public FieldsPartitioner(int keyPosition, Class<? extends Key> keyClass) { public FieldsPartitioner(int keyPosition) {
this.keyPosition = keyPosition; this.keyPosition = keyPosition;
// this.keyClass = keyClass;
} }
@Override @Override
public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) { public int[] selectChannels(StreamRecord record, int numberOfOutputChannels) {
//TODO:fix this //TODO:Better hashing?
// Key key = null;
// try {
// key = keyClass.newInstance();
// } catch (InstantiationException e) {
// e.printStackTrace();
// } catch (IllegalAccessException e) {
// e.printStackTrace();
// }
// TODO: consider hash partition the whole record batch.
//
// }
// key = keyClass.cast(record.getField(0, keyPosition));
return new int[] { Math.abs(record.getField(0, keyPosition).hashCode()) % numberOfOutputChannels }; return new int[] { Math.abs(record.getField(0, keyPosition).hashCode()) % numberOfOutputChannels };
} }
} }
...@@ -15,14 +15,16 @@ ...@@ -15,14 +15,16 @@
package eu.stratosphere.streaming.faulttolerance; package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.*; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class AtLeastOnceBufferTest { public class AtLeastOnceBufferTest {
......
...@@ -15,14 +15,16 @@ ...@@ -15,14 +15,16 @@
package eu.stratosphere.streaming.faulttolerance; package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.*; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class ExactlyOnceBufferTest { public class ExactlyOnceBufferTest {
......
...@@ -15,12 +15,6 @@ ...@@ -15,12 +15,6 @@
package eu.stratosphere.streaming.faulttolerance; package eu.stratosphere.streaming.faulttolerance;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
...@@ -29,8 +23,6 @@ import org.junit.Test; ...@@ -29,8 +23,6 @@ import org.junit.Test;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.types.StringValue;
public class FaultToleranceUtilTest { public class FaultToleranceUtilTest {
......
...@@ -23,7 +23,6 @@ import org.junit.Test; ...@@ -23,7 +23,6 @@ import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class FieldsPartitionerTest { public class FieldsPartitionerTest {
...@@ -35,7 +34,7 @@ public class FieldsPartitionerTest { ...@@ -35,7 +34,7 @@ public class FieldsPartitionerTest {
@Before @Before
public void setPartitioner() { public void setPartitioner() {
fieldsPartitioner = new FieldsPartitioner(0, StringValue.class); fieldsPartitioner = new FieldsPartitioner(0);
} }
@Test @Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册