diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/DefaultPartitioner.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/DefaultPartitioner.java index 8e456b04f0e662e55ca5963e2a582a4308017ec8..b600b782a2e9ed6de2c5514a7f55119c65561b10 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/DefaultPartitioner.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/DefaultPartitioner.java @@ -2,11 +2,12 @@ package eu.stratosphere.streaming; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.nephele.io.ChannelSelector; +import eu.stratosphere.types.Record; -public class DefaultPartitioner implements ChannelSelector { +public class DefaultPartitioner implements ChannelSelector { @Override - public int[] selectChannels(IOReadableWritable record, int numberOfOutputChannels) { + public int[] selectChannels(Record record, int numberOfOutputChannels) { int[] returnChannels = new int[numberOfOutputChannels]; for(int i = 0; i < numberOfOutputChannels;i++) { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/MyStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/MyStream.java index d19f7c648859707cc5f203e6e52baec1c085a43c..a715013c9217d26adeacf463ffd3049fa0091005 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/MyStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/MyStream.java @@ -21,13 +21,15 @@ import eu.stratosphere.nephele.template.AbstractOutputTask; import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.pact.runtime.task.util.TaskConfig; import eu.stratosphere.test.util.TestBase2; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; public class MyStream extends TestBase2 { public static class InfoSource extends AbstractInputTask { - private RecordWriter output; - private Class> Partitioner; - ChannelSelector partitioner; + private RecordWriter output; + private Class> Partitioner; + ChannelSelector partitioner; private Class UserFunction; UserSourceInvokable userFunction; @@ -71,9 +73,7 @@ public class MyStream extends TestBase2 { @Override public void registerInputOutput() { setClassInputs(); - output = new RecordWriter(this, - IOReadableWritable.class, this.partitioner); - + output = new RecordWriter(this, Record.class, this.partitioner); } @Override @@ -84,7 +84,7 @@ public class MyStream extends TestBase2 { public static class QuerySource extends AbstractInputTask { - private RecordWriter output; + private RecordWriter output; @Override public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception { @@ -100,8 +100,7 @@ public class MyStream extends TestBase2 { @Override public void registerInputOutput() { - output = new RecordWriter(this, StringRecord.class, - new StreamPartitioner()); + output = new RecordWriter(this, Record.class, new StreamPartitioner()); } @Override @@ -111,8 +110,8 @@ public class MyStream extends TestBase2 { for (int i = 0; i < 5; i++) { // output.emit(new // StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)+" 500")); - output.emit(new StringRecord("5 510 100")); - output.emit(new StringRecord("4 510 100")); + output.emit(new Record(new StringValue("5 510 100"))); + output.emit(new Record(new StringValue("4 510 100"))); } } @@ -121,18 +120,21 @@ public class MyStream extends TestBase2 { public static class MySink extends AbstractOutputTask { - private RecordReader input = null; + private RecordReader input = null; @Override public void registerInputOutput() { - this.input = new RecordReader(this, StringRecord.class); + this.input = new RecordReader(this, Record.class); } @Override public void invoke() throws Exception { while (input.hasNext()) { - System.out.println(input.next().toString()); + StringValue value = new StringValue(""); + Record record = input.next(); + record.getFieldInto(0, value); + System.out.println(value.getValue()); } } @@ -229,8 +231,8 @@ public class MyStream extends TestBase2 { graphBuilder.connect("querySource", "cellTask", ChannelType.INMEMORY); graphBuilder.connect("cellTask", "sink", ChannelType.INMEMORY); - //return graphBuilder.getJobGraph(); - return myJG; + return graphBuilder.getJobGraph(); + //return myJG; } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamPartitioner.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamPartitioner.java index efd23eda50fba8feb0fee9cb8371f2e9ed9f29c1..800c740430363c825e4c8de2adee3dbbe04afc15 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamPartitioner.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamPartitioner.java @@ -2,15 +2,22 @@ package eu.stratosphere.streaming; import eu.stratosphere.core.io.StringRecord; import eu.stratosphere.nephele.io.ChannelSelector; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; -public class StreamPartitioner implements ChannelSelector { +public class StreamPartitioner implements ChannelSelector { - @Override + /*@Override public int[] selectChannels(StringRecord record, int numberOfOutputChannels) { - // TODO Auto-generated method stub - int cellId = Integer.parseInt(record.toString().split(" ")[0]); - + int cellId = Integer.parseInt(record.toString().split(" ")[0]); return new int[]{cellId % numberOfOutputChannels}; - } + }*/ + @Override + public int[] selectChannels(Record record, int numberOfOutputChannels) { + StringValue value = new StringValue(""); + record.getFieldInto(0, value); + int cellId = Integer.parseInt(value.getValue().split(" ")[0]); + return new int[]{cellId % numberOfOutputChannels}; + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamSource.java index e7d51473618312cd545f92e2b934f777a65b6d63..8f3b8d21017e6997d3d5694f49eed0788f1ba3be 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamSource.java @@ -4,12 +4,13 @@ import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInputTask; +import eu.stratosphere.types.Record; public class StreamSource extends AbstractInputTask { - private RecordWriter output; - private Class> Partitioner; - ChannelSelector partitioner; + private RecordWriter output; + private Class> Partitioner; + ChannelSelector partitioner; private Class UserFunction; private UserSourceInvokable userFunction; @@ -53,16 +54,14 @@ public class StreamSource extends AbstractInputTask { @Override public void registerInputOutput() { setClassInputs(); - output = new RecordWriter(this, - IOReadableWritable.class, this.partitioner); + output = new RecordWriter(this, + Record.class, this.partitioner); } @Override public void invoke() throws Exception { - userFunction.invoke(output); - } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamTask.java index b296aa160010b514622632a6a992fed15810788d..425a4f0c00cc6bf7d0d51878b1c69f1e6f36d58c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/StreamTask.java @@ -11,18 +11,19 @@ import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.streaming.cellinfo.WorkerEngineExact; +import eu.stratosphere.types.Record; public class StreamTask extends AbstractTask { - private RecordWriter output; - private Class> Partitioner; - ChannelSelector partitioner; + private RecordWriter output; + private Class> Partitioner; + ChannelSelector partitioner; private Class UserFunction; private UserTaskInvokable userFunction; - private RecordReader inputInfo = null; - private RecordReader inputQuery = null; + private RecordReader inputInfo = null; + private RecordReader inputQuery = null; @@ -55,19 +56,30 @@ public class StreamTask extends AbstractTask { @Override public void registerInputOutput() { setClassInputs(); - this.inputInfo = new RecordReader(this, IOReadableWritable.class); - this.inputQuery = new RecordReader(this, IOReadableWritable.class); - output = new RecordWriter(this, IOReadableWritable.class, this.partitioner); + this.inputInfo = new RecordReader(this, Record.class); + this.inputQuery = new RecordReader(this, Record.class); + output = new RecordWriter(this, Record.class, this.partitioner); } @Override public void invoke() throws Exception { - List< RecordReader> inputs = new ArrayList< RecordReader>(); + List< RecordReader> inputs = new ArrayList< RecordReader>(); inputs.add(inputInfo); inputs.add(inputQuery); - - userFunction.invoke(inputs,output); + + boolean hasInput = true; + while (hasInput) + { + hasInput = false; + for (RecordReader input : inputs) + { + if (input.hasNext()) { + hasInput = true; + userFunction.invoke(input.next(), output); + } + } + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestSourceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestSourceInvokable.java index 714100e749143588590183a4292699da2627b931..39ef248a3f356e2530f189011524b0b56d11435d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestSourceInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestSourceInvokable.java @@ -4,16 +4,18 @@ package eu.stratosphere.streaming; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.core.io.StringRecord; import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; +import eu.stratosphere.types.Value; public class TestSourceInvokable implements UserSourceInvokable { @Override - public void invoke(RecordWriter output) throws Exception { + public void invoke(RecordWriter output) throws Exception { for (int i = 0; i < 10; i++) { // output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000))); - output.emit(new StringRecord("5 500")); - output.emit(new StringRecord("4 500")); - + output.emit(new Record(new StringValue("5 500")));//new StringRecord("5 500")); + output.emit(new Record(new StringValue("4 500"))); } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestTaskInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestTaskInvokable.java index 206f9d8b5829b1529368d1e6b8ff6cd2db0da62e..aca9121a97af6ee105fca991f0b17a6eb231c8cd 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestTaskInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/TestTaskInvokable.java @@ -7,40 +7,61 @@ import eu.stratosphere.core.io.StringRecord; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.cellinfo.WorkerEngineExact; +import eu.stratosphere.types.Record; +import eu.stratosphere.types.StringValue; public class TestTaskInvokable implements UserTaskInvokable { private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0); @Override - public void invoke(List> inputs, - RecordWriter output) throws Exception { - RecordReader input1= inputs.get(0); - RecordReader input2= inputs.get(0); - - while (input1.hasNext() && input2.hasNext()) { - String[] info = input1.next().toString().split(" "); - String[] query = input2.next().toString().split(" "); - - engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1])); - - output.emit(new StringRecord(info[0] + " " + info[1])); - output.emit(new StringRecord(String.valueOf(engine.get( - Long.parseLong(query[1]), Long.parseLong(query[2]), - Integer.parseInt(query[0]))))); - } - while (inputs.get(0).hasNext()) { - - IOReadableWritable info = inputs.get(0).next(); - - output.emit(info); - } - while (inputs.get(1).hasNext()) { - - IOReadableWritable query = inputs.get(1).next(); - - output.emit(query); - } + public void invoke(Record record, + RecordWriter output) throws Exception { + StringValue value = new StringValue(); + record.getFieldInto(0, value); + String[] values = value.getValue().split(" "); + + //INFO + if (values.length == 2) + { + engine.put(Integer.parseInt(values[0]), Long.parseLong(values[1])); + output.emit(new Record(new StringValue(values[0] + " " + values[1]))); + } + //QUERY + else if (values.length == 3) + { + output.emit(new Record(new StringValue(String.valueOf(engine.get( + Long.parseLong(values[1]), Long.parseLong(values[2]), + Integer.parseInt(values[0])))))); + } + +// RecordReader input1= inputs.get(0); +// RecordReader input2= inputs.get(0); +// +// +// while (input1.hasNext() && input2.hasNext()) { +// String[] info = input1.next().toString().split(" "); +// String[] query = input2.next().toString().split(" "); +// +// engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1])); +// +// output.emit(new StringRecord(info[0] + " " + info[1])); +// output.emit(new StringRecord(String.valueOf(engine.get( +// Long.parseLong(query[1]), Long.parseLong(query[2]), +// Integer.parseInt(query[0]))))); +// } +// while (inputs.get(0).hasNext()) { +// +// IOReadableWritable info = inputs.get(0).next(); +// +// output.emit(info); +// } +// while (inputs.get(1).hasNext()) { +// +// IOReadableWritable query = inputs.get(1).next(); +// +// output.emit(query); +// } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserSourceInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserSourceInvokable.java index 74fcd186f34bf95308db388c441aa7be15d73fc2..3344ae4dfea23a3a1c745f8d7767a1a2779fe632 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserSourceInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserSourceInvokable.java @@ -2,7 +2,8 @@ package eu.stratosphere.streaming; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.types.Record; public interface UserSourceInvokable { - public void invoke(RecordWriter output) throws Exception ; + public void invoke(RecordWriter output) throws Exception ; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserTaskInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserTaskInvokable.java index 86b18959a977c403dcf6885cc4cc1b34cb51a752..2923226d2da100169c6e69670bacca1dccf8fa1a 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserTaskInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/UserTaskInvokable.java @@ -5,10 +5,11 @@ import java.util.List; import eu.stratosphere.core.io.IOReadableWritable; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.types.Record; public interface UserTaskInvokable { - public void invoke(List> inputs, - RecordWriter output) throws Exception; + public void invoke(Record record, + RecordWriter output) throws Exception; }