提交 b933fcbf 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] CellInfo refactor

上级 55f880bd
......@@ -13,25 +13,22 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue;
public class MyStream extends TestBase2 {
public class CellInfo extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", DefaultSourceInvokable.class);
graphBuilder.setSource("querySource", DefaultSourceInvokable.class);
graphBuilder.setTask("cellTask", DefaultTaskInvokable.class, 2);
graphBuilder.setSink("sink", DefaultSinkInvokable.class);
graphBuilder.setSource("infoSource", InfoSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", CellTaskInvokable.class, 2);
graphBuilder.setSink("sink", CellSinkInvokable.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, StringValue.class);
graphBuilder.fieldsConnect("querySource", "cellTask",0, StringValue.class);
......
......@@ -13,13 +13,13 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.StringValue;
public class TestSinkInvokable implements UserSinkInvokable {
public class CellSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
......
......@@ -13,17 +13,15 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class TestTaskInvokable extends UserTaskInvokable {
public class CellTaskInvokable extends UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
......@@ -35,15 +33,13 @@ public class TestTaskInvokable extends UserTaskInvokable {
// INFO
if (record.getNumOfFields() == 2) {
engine.put(value1.getValue(), value2.getValue());
emit(new StreamRecord(new AtomRecord(new StringValue(value1 + " "
+ value2))));
emit(new StreamRecord(new StringValue(value1 + " " + value2)));
}
// QUERY
else if (record.getNumOfFields() == 3) {
LongValue value3 = (LongValue) record.getField(0, 2);
emit(new StreamRecord(new AtomRecord(new StringValue(
String.valueOf(engine.get(value2.getValue(),
value3.getValue(), value1.getValue()))))));
emit(new StreamRecord(new StringValue(String.valueOf(engine.get(
value2.getValue(), value3.getValue(), value1.getValue())))));
}
}
}
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
......@@ -21,7 +21,7 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
public class TestSourceInvokable extends UserSourceInvokable {
public class InfoSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册