提交 6d6d8a5a 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] FlatStreamRecored fixed

上级 2bbfa248
......@@ -18,7 +18,7 @@ public class FlatStreamRecord {
this();
this.numberOfFields = numberOfFields;
record = new Record(numberOfFields + 1);
record = new Record(numberOfFields);
LongValue idValue = new LongValue(id);
record.addField(idValue);
}
......
......@@ -31,11 +31,7 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
import eu.stratosphere.streaming.partitioner.ShufflePartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.types.Record;
public class JobGraphBuilder {
......@@ -43,6 +39,20 @@ public class JobGraphBuilder {
private final JobGraph jobGraph;
private Map<String, AbstractJobVertex> components;
public enum Partitioning {
BROADCAST
}
private Class<? extends ChannelSelector<Record>> getPartitioningClass(
Partitioning partitioning) {
switch (partitioning) {
case BROADCAST:
return DefaultPartitioner.class;
default:
return DefaultPartitioner.class;
}
}
public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName);
......@@ -84,9 +94,8 @@ public class JobGraphBuilder {
components.put(sinkName, sink);
}
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<Record>> PartitionerClass,
public void connect(String upStreamComponentName,
String downStreamComponentName, Partitioning partitionType,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
......@@ -98,68 +107,15 @@ public class JobGraphBuilder {
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitionerClass_"
+ upStreamComponent.getNumberOfForwardConnections(),
PartitionerClass);
"partitioner_" + upStreamComponent.getNumberOfForwardConnections(),
getPartitioningClass(partitionType));
// System.out.println(upStreamComponentName + " " + "partitioner_"
// + upStreamComponent.getNumberOfForwardConnections());
} catch (JobGraphDefinitionException e) {
e.printStackTrace();
}
}
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
BroadcastPartitioner.class, ChannelType.INMEMORY);
}
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition,
Class<? extends Key> keyClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.INMEMORY);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitionerClass_"
+ upStreamComponent.getNumberOfForwardConnections(),
FieldsPartitioner.class);
config.setClass(
"partitionerClassParam_"
+ upStreamComponent.getNumberOfForwardConnections(), keyClass);
config.setInteger(
"partitionerIntParam_"
+ upStreamComponent.getNumberOfForwardConnections(), keyPosition);
} catch (JobGraphDefinitionException e) {
e.printStackTrace();
}
}
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
GlobalPartitioner.class, ChannelType.INMEMORY);
}
public void ShuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
ShufflePartitioner.class, ChannelType.INMEMORY);
}
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs",
......
......@@ -68,7 +68,9 @@ public class StreamSink extends AbstractOutputTask {
for (RecordReader<Record> input : inputs) {
if (input.hasNext()) {
hasInput = true;
userFunction.invoke(input.next());
Record rec = input.next();
rec.removeField(rec.getNumFields()-1);
userFunction.invoke(rec);
}
}
}
......
......@@ -60,7 +60,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
Class<? extends ChannelSelector<Record>> partitioner;
for (int i = 1; i <= numberOfOutputs; i++) {
partitioner = getTaskConfiguration().getClass("partitionerClass_" + i,
partitioner = getTaskConfiguration().getClass("partitioner_" + i,
DefaultPartitioner.class, ChannelSelector.class);
try {
......
......@@ -26,7 +26,6 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class StreamTask extends AbstractTask {
......@@ -59,7 +58,7 @@ public class StreamTask extends AbstractTask {
Class<? extends ChannelSelector<Record>> partitioner;
for (int i = 1; i <= numberOfOutputs; i++) {
partitioner = getTaskConfiguration().getClass("partitionerClass_" + i,
partitioner = getTaskConfiguration().getClass("partitioner_" + i,
DefaultPartitioner.class, ChannelSelector.class);
try {
partitioners.add(partitioner.newInstance());
......@@ -87,8 +86,6 @@ public class StreamTask extends AbstractTask {
@Override
public void registerInputOutput() {
setConfigInputs();
Record r = new Record();
r.addField(new StringValue(""));
}
@Override
......@@ -99,7 +96,9 @@ public class StreamTask extends AbstractTask {
for (RecordReader<Record> input : inputs) {
if (input.hasNext()) {
hasInput = true;
userFunction.invoke(new FlatStreamRecord(input.next()));
Record rec = input.next();
rec.removeField(rec.getNumFields()-1);
userFunction.invoke(rec);
}
}
}
......
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
......@@ -27,7 +26,7 @@ public class DefaultSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (CharSequence word : mottoArray) {
emit(new FlatStreamRecord(new Record(new StringValue(word))));
emit(new Record(new StringValue(word)));
}
}
......
......@@ -15,12 +15,12 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record;
public class DefaultTaskInvokable extends UserTaskInvokable {
@Override
public void invoke(FlatStreamRecord streamRecord) throws Exception {
emit(streamRecord);
public void invoke(Record record) throws Exception {
emit(record);
}
}
\ No newline at end of file
......@@ -14,13 +14,14 @@ public abstract class StreamInvokable {
this.outputs = outputs;
}
public final void emit(FlatStreamRecord streamRecord) {
public final void emit(Record record) {
for (RecordWriter<Record> output : outputs) {
try {
FlatStreamRecord streamRecord = new FlatStreamRecord(record);
output.emit(streamRecord.getRecord());
} catch (Exception e) {
System.out.println("Emit error");
}
}
}
}
}
\ No newline at end of file
......@@ -18,6 +18,5 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.types.Record;
public interface UserSinkInvokable {
public void invoke(Record record) throws Exception;
}
\ No newline at end of file
......@@ -15,11 +15,11 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record;
public abstract class UserTaskInvokable extends StreamInvokable {
public void invoke(FlatStreamRecord record) throws Exception {
public void invoke(Record record) throws Exception {
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
public class BroadcastPartitioner implements ChannelSelector<Record> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
int[] returnChannels = new int[numberOfOutputChannels];
for (int i = 0; i < numberOfOutputChannels; i++) {
returnChannels[i] = i;
}
return returnChannels;
}
}
\ No newline at end of file
......@@ -23,8 +23,11 @@ public class DefaultPartitioner implements ChannelSelector<Record> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
return new BroadcastPartitioner().selectChannels(record, numberOfOutputChannels);
int[] returnChannels = new int[numberOfOutputChannels];
for(int i = 0; i < numberOfOutputChannels;i++) {
returnChannels[i]=i;
}
return returnChannels;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
//Grouping by a key
public class FieldsPartitioner implements ChannelSelector<Record> {
private int keyPosition;
private Class<? extends Key> keyClass;
// TODO: make sure it is actually a key
public FieldsPartitioner(int keyPosition, Class<? extends Key> keyClass) {
this.keyPosition = keyPosition;
this.keyClass = keyClass;
}
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
Key key = null;
try {
key = keyClass.newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
record.getFieldInto(keyPosition, key);
return new int[] { key.hashCode() % numberOfOutputChannels };
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.partitioner;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
//Randomly group, to distribute equally
public class GlobalPartitioner implements ChannelSelector<Record> {
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
return new int[]{0};
}
}
......@@ -15,17 +15,31 @@
package eu.stratosphere.streaming.partitioner;
import java.util.Random;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
//Randomly group, to distribute equally
public class ShufflePartitioner implements ChannelSelector<Record> {
private int keyPosition;
private Class<? extends Key> keyClass;
//TODO: make sure it is actually a key
public ShufflePartitioner(int keyPosition, Class<? extends Key> keyClass){
this.keyPosition = keyPosition;
}
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
Random random = new Random();
return new int[]{random.nextInt(numberOfOutputChannels)};
Key key = null;
try {
key = keyClass.newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
}
record.getFieldInto(keyPosition, key);
return new int[]{key.hashCode() % numberOfOutputChannels};
}
}
......@@ -15,11 +15,10 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.JobGraphBuilder.Partitioning;
import eu.stratosphere.test.util.TestBase2;
public class MyStream extends TestBase2 {
......@@ -27,14 +26,17 @@ public class MyStream extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", DefaultSourceInvokable.class);
graphBuilder.setSource("querySource", DefaultSourceInvokable.class);
graphBuilder.setTask("cellTask", DefaultTaskInvokable.class, 2);
graphBuilder.setSink("sink", DefaultSinkInvokable.class);
graphBuilder.setSource("infoSource", TestSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", TestTaskInvokable.class, 2);
graphBuilder.setSink("sink", TestSinkInvokable.class);
graphBuilder.globalConnect("infoSource", "cellTask");
graphBuilder.broadcastConnect("querySource", "cellTask");
graphBuilder.broadcastConnect("cellTask", "sink");
graphBuilder.connect("infoSource", "cellTask", Partitioning.BROADCAST,
ChannelType.INMEMORY);
graphBuilder.connect("querySource", "cellTask", Partitioning.BROADCAST,
ChannelType.INMEMORY);
graphBuilder.connect("cellTask", "sink", Partitioning.BROADCAST,
ChannelType.INMEMORY);
return graphBuilder.getJobGraph();
}
......
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
......@@ -36,8 +35,8 @@ public class QuerySourceInvokable extends UserSourceInvokable {
record2.setField(1, new LongValue(510));
record2.setField(2, new LongValue(100));
emit(new FlatStreamRecord(record1));
emit(new FlatStreamRecord(record2));
emit(record1);
emit(record2);
}
}
......
......@@ -22,9 +22,8 @@ import eu.stratosphere.types.StringValue;
public class TestSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(Record record) throws Exception {
StringValue value = new StringValue("");
public void invoke(Record record) throws Exception {
StringValue value = new StringValue("");
record.getFieldInto(0, value);
System.out.println(value.getValue());
}
......
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
......@@ -32,8 +31,8 @@ public class TestSourceInvokable extends UserSourceInvokable {
Record record2 = new Record(2);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(500));
emit(new FlatStreamRecord(record1));
emit(new FlatStreamRecord(record2));
emit(record1);
emit(record2);
}
}
......
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue;
......@@ -28,23 +27,23 @@ public class TestTaskInvokable extends UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override
public void invoke(FlatStreamRecord record) throws Exception {
public void invoke(Record record) throws Exception {
IntValue value1 = new IntValue(0);
record.getFieldInto(0, value1);
LongValue value2 = new LongValue(0);
record.getFieldInto(1, value2);
// INFO
if (record.getNumFields() == 2) {
engine.put(value1.getValue(), value2.getValue());
emit(new FlatStreamRecord(new Record(new StringValue(value1 + " " + value2))));
emit(new Record(new StringValue(value1 + " " + value2)));
}
// QUERY
else if (record.getNumFields() == 3) {
LongValue value3 = new LongValue(0);
record.getFieldInto(2, value3);
emit(new FlatStreamRecord(new Record(new StringValue(String.valueOf(engine.get(
value2.getValue(), value3.getValue(), value1.getValue()))))));
emit(new Record(new StringValue(String.valueOf(engine.get(
value2.getValue(), value3.getValue(), value1.getValue())))));
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册