diff --git a/flink-addons/flink-streaming/pom.xml b/flink-addons/flink-streaming/pom.xml index 5eade27a2f2c782824e4cd47f78bf79f0e87e58f..119900249dd3e6ee1dedbe6a3763b4d27768c346 100644 --- a/flink-addons/flink-streaming/pom.xml +++ b/flink-addons/flink-streaming/pom.xml @@ -5,7 +5,7 @@ 4.0.0 eu.stratosphere - 0.5 + 0.5-rc2 stratosphere-streaming stratosphere-streaming @@ -83,6 +83,11 @@ amqp-client 3.3.1 + + org.apache.kafka + kafka_2.10 + 0.8.1.1 + diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java index 720972178a43c921f8bcec5a59dccc70a9eab0f2..0ec2077f3a0bcef164795fc5f5a93b874b9663c8 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamWindowTask.java @@ -18,7 +18,7 @@ package eu.stratosphere.streaming.api.streamcomponent; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.state.MutableTableState; -import eu.stratosphere.streaming.state.WindowState; +import eu.stratosphere.streaming.state.SlidingWindowState; public class StreamWindowTask extends UserTaskInvokable { private static final long serialVersionUID = 1L; @@ -27,7 +27,7 @@ public class StreamWindowTask extends UserTaskInvokable { private int windowFieldId = 1; private StreamRecord tempRecord; - private WindowState window; + private SlidingWindowState window; private MutableTableState sum; private long initTimestamp = -1; private long nextTimestamp = -1; @@ -36,7 +36,7 @@ public class StreamWindowTask extends UserTaskInvokable { int computeGranularity, int windowFieldId) { this.computeGranularity = computeGranularity; this.windowFieldId = windowFieldId; - window = new WindowState(windowSize, slidingStep, + window = new SlidingWindowState(windowSize, slidingStep, computeGranularity); sum = new MutableTableState(); sum.put("sum", 0); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..b33cc0e637b3b8f0fcd50f34c76b55623ec97834 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeLocal.java @@ -0,0 +1,86 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative; + +import java.net.InetSocketAddress; + +import org.apache.log4j.Level; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; +import eu.stratosphere.streaming.util.LogUtils; + +public class IterativeLocal { + + public static JobGraph getJobGraph() { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); + graphBuilder.setSource("IterativeSource", IterativeSource.class); + graphBuilder.setTask("IterativeParallel", IterativeParallel.class, 1, 1); + graphBuilder.setTask("IterativeStateHolder", IterativeStateHolder.class); + graphBuilder.setSink("IterativeSink", IterativeSink.class); + + graphBuilder.fieldsConnect("IterativeSource", "IterativeParallel", 1); + graphBuilder.fieldsConnect("IterativeParallel", "IterativeStateHolder", 1); + graphBuilder.globalConnect("IterativeStateHolder", "IterativeSink"); + + return graphBuilder.getJobGraph(); + } + + public static void main(String[] args) { + + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + + try { + + JobGraph jG = getJobGraph(); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; + } + + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); + + exec.start(); + + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); + + client.run(jG, true); + + exec.stop(); + + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster2 mode"); + + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); + + client.run(jG, true); + + } + + } catch (Exception e) { + System.out.println(e); + } + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeParallel.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeParallel.java new file mode 100644 index 0000000000000000000000000000000000000000..f6df59887fdc6028bf6cb923e09dd77ca12f2387 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeParallel.java @@ -0,0 +1,34 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative; + + +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class IterativeParallel extends UserTaskInvokable { + + private static final long serialVersionUID = -3042489460184024483L; + + public IterativeParallel() { + } + + @Override + public void invoke(StreamRecord record) throws Exception { + // TODO Auto-generated method stub + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSink.java new file mode 100644 index 0000000000000000000000000000000000000000..6ea080b62621b374543361e88104c04a232bb505 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSink.java @@ -0,0 +1,37 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative; + +import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class IterativeSink extends UserSinkInvokable { + + private static final long serialVersionUID = -1989637817643875304L; + + @Override + public void invoke(StreamRecord record) throws Exception { + System.out.println("received record..."); + int tupleNum = record.getNumOfTuples(); + System.out.println("============================================"); + for (int i = 0; i < tupleNum; ++i) { + System.out.println("name=" + record.getField(i, 0) + ", grade=" + + record.getField(i, 1) + ", salary=" + + record.getField(i, 2)); + } + System.out.println("============================================"); + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSource.java new file mode 100644 index 0000000000000000000000000000000000000000..ace51bec8b9ca390476a1523c03b6a15ec66145d --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeSource.java @@ -0,0 +1,30 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative; + +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; + +public class IterativeSource extends UserSourceInvokable { + + private static final long serialVersionUID = 8983174839600079890L; + + @Override + public void invoke() throws Exception { + // TODO Auto-generated method stub + + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeStateHolder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeStateHolder.java new file mode 100644 index 0000000000000000000000000000000000000000..4200bffdc361b0c9a141c6b8c3a8f01f06c8a527 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/IterativeStateHolder.java @@ -0,0 +1,34 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative; + + +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class IterativeStateHolder extends UserTaskInvokable { + + private static final long serialVersionUID = -3042489460184024483L; + + public IterativeStateHolder() { + } + + @Override + public void invoke(StreamRecord record) throws Exception { + // TODO Auto-generated method stub + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..9ddca15c6455c420a6d8bc20fa1ad2901f96f1a5 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/collaborativefilter/CollaborativeFilteringLocal.java @@ -0,0 +1,25 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative.collaborativefilter; + +public class CollaborativeFilteringLocal { + + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..2286ffff642eb01e0e56762d70d4093b086a41df --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/kmeans/KMeansLocal.java @@ -0,0 +1,25 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative.kmeans; + +public class KMeansLocal { + + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PagerankLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PagerankLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..e06d04d576fb3f6505bfdfecf7f2c26dee482eda --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/pagerank/PagerankLocal.java @@ -0,0 +1,25 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative.pagerank; + +public class PagerankLocal { + + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..6c06a467e11ab02b4c35f785b708be3bdc2de91e --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/iterative/sssp/SSSPLocal.java @@ -0,0 +1,25 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.iterative.sssp; + +public class SSSPLocal { + + public static void main(String[] args) { + // TODO Auto-generated method stub + + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..7b0142aee76a8c59ffc96f9ca9a5516500dbc76c --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinLocal.java @@ -0,0 +1,86 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.join; + +import java.net.InetSocketAddress; + +import org.apache.log4j.Level; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; +import eu.stratosphere.streaming.util.LogUtils; + +public class JoinLocal { + + public static JobGraph getJobGraph() { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); + graphBuilder.setSource("JoinSourceOne", JoinSourceOne.class); + graphBuilder.setSource("JoinSourceTwo", JoinSourceTwo.class); + graphBuilder.setTask("JoinTask", JoinTask.class, 1, 1); + graphBuilder.setSink("JoinSink", JoinSink.class); + + graphBuilder.fieldsConnect("JoinSourceOne", "JoinTask", 1); + graphBuilder.fieldsConnect("JoinSourceTwo", "JoinTask", 1); + graphBuilder.shuffleConnect("JoinTask", "JoinSink"); + + return graphBuilder.getJobGraph(); + } + + public static void main(String[] args) { + + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + + try { + + JobGraph jG = getJobGraph(); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; + } + + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); + + exec.start(); + + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); + + client.run(jG, true); + + exec.stop(); + + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster2 mode"); + + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); + + client.run(jG, true); + + } + + } catch (Exception e) { + System.out.println(e); + } + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSink.java new file mode 100644 index 0000000000000000000000000000000000000000..1c39adaa42176c5e76b11982ed485185086ac430 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSink.java @@ -0,0 +1,36 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.join; + +import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class JoinSink extends UserSinkInvokable { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(StreamRecord record) throws Exception { + System.out.println("received record..."); + int tupleNum = record.getNumOfTuples(); + System.out.println("============================================"); + for (int i = 0; i < tupleNum; ++i) { + System.out.println("name=" + record.getField(i, 0) + ", grade=" + + record.getField(i, 1) + ", salary=" + + record.getField(i, 2)); + } + System.out.println("============================================"); + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSourceOne.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSourceOne.java new file mode 100644 index 0000000000000000000000000000000000000000..4562d02642d83e60ccd54814530657a3cb6ba6e4 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSourceOne.java @@ -0,0 +1,44 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.join; + +import java.util.Random; + +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class JoinSourceOne extends UserSourceInvokable { + + private static final long serialVersionUID = 6670933703432267728L; + + private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", + "sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge", + "black", "peter" }; + private Random rand = new Random(); + private StreamRecord outRecord = new StreamRecord( + new Tuple3()); + + @Override + public void invoke() throws Exception { + while (true) { + outRecord.setString(0, "salary"); + outRecord.setString(1, names[rand.nextInt(names.length)]); + outRecord.setInteger(2, rand.nextInt(10000)); + emit(outRecord); + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSourceTwo.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSourceTwo.java new file mode 100644 index 0000000000000000000000000000000000000000..1a9ed34480a0c012eba6aac781b2ab4816ee451d --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinSourceTwo.java @@ -0,0 +1,44 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.join; + +import java.util.Random; + +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class JoinSourceTwo extends UserSourceInvokable { + + private static final long serialVersionUID = -5897483980082089771L; + + private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", + "sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge", + "black", "peter" }; + private Random rand = new Random(); + private StreamRecord outRecord = new StreamRecord( + new Tuple3()); + + @Override + public void invoke() throws Exception { + while (true) { + outRecord.setString(0, "grade"); + outRecord.setString(1, names[rand.nextInt(names.length)]); + outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A'))); + emit(outRecord); + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinTask.java new file mode 100644 index 0000000000000000000000000000000000000000..0da311855cb70dfdaa60e6a3440d995a9b99a8d5 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/join/JoinTask.java @@ -0,0 +1,72 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.join; + +import java.util.ArrayList; +import java.util.HashMap; + +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class JoinTask extends UserTaskInvokable { + private static final long serialVersionUID = 749913336259789039L; + + private HashMap> gradeHashmap; + private HashMap> salaryHashmap; + private StreamRecord outRecord = new StreamRecord(3); + + public JoinTask() { + gradeHashmap = new HashMap>(); + salaryHashmap = new HashMap>(); + } + + @Override + public void invoke(StreamRecord record) throws Exception { + // TODO Auto-generated method stub + String streamId = record.getString(0); + String name = record.getString(1); + if (streamId.equals("grade")) { + if (salaryHashmap.containsKey(name)) { + for (Integer salary : salaryHashmap.get(name)) { + Tuple3 outputTuple = new Tuple3( + name, record.getString(2), salary); + outRecord.addTuple(outputTuple); + } + emit(outRecord); + outRecord.Clear(); + } + if (!gradeHashmap.containsKey(name)) { + gradeHashmap.put(name, new ArrayList()); + } + gradeHashmap.get(name).add(record.getString(2)); + } else { + if (gradeHashmap.containsKey(name)) { + for (String grade : gradeHashmap.get(name)) { + Tuple3 outputTuple = new Tuple3( + name, grade, record.getInteger(2)); + outRecord.addTuple(outputTuple); + } + emit(outRecord); + outRecord.Clear(); + } + if (!salaryHashmap.containsKey(name)) { + salaryHashmap.put(name, new ArrayList()); + } + salaryHashmap.get(name).add(record.getInteger(2)); + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java new file mode 100644 index 0000000000000000000000000000000000000000..80275268df30186d0113c061bae207b7429870a6 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinLocal.java @@ -0,0 +1,86 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.join; + +import java.net.InetSocketAddress; + +import org.apache.log4j.Level; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; +import eu.stratosphere.streaming.util.LogUtils; + +public class WindowJoinLocal { + + public static JobGraph getJobGraph() { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE); + graphBuilder.setSource("WindowJoinSourceOne", WindowJoinSourceOne.class); + graphBuilder.setSource("WindowJoinSourceTwo", WindowJoinSourceTwo.class); + graphBuilder.setTask("WindowJoinTask", WindowJoinTask.class, 1, 1); + graphBuilder.setSink("WindowJoinSink", WindowJoinSink.class); + + graphBuilder.fieldsConnect("WindowJoinSourceOne", "WindowJoinTask", 1); + graphBuilder.fieldsConnect("WindowJoinSourceTwo", "WindowJoinTask", 1); + graphBuilder.shuffleConnect("WindowJoinTask", "WindowJoinSink"); + + return graphBuilder.getJobGraph(); + } + + public static void main(String[] args) { + + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + + try { + + JobGraph jG = getJobGraph(); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; + } + + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); + + exec.start(); + + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); + + client.run(jG, true); + + exec.stop(); + + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster2 mode"); + + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); + + client.run(jG, true); + + } + + } catch (Exception e) { + System.out.println(e); + } + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSink.java new file mode 100644 index 0000000000000000000000000000000000000000..497d13e607df03503a0e849e093f79845debb67a --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSink.java @@ -0,0 +1,36 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.join; + +import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowJoinSink extends UserSinkInvokable { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(StreamRecord record) throws Exception { + System.out.println("received record..."); + int tupleNum = record.getNumOfTuples(); + System.out.println("============================================"); + for (int i = 0; i < tupleNum; ++i) { + System.out.println("name=" + record.getField(i, 0) + ", grade=" + + record.getField(i, 1) + ", salary=" + + record.getField(i, 2)); + } + System.out.println("============================================"); + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java new file mode 100644 index 0000000000000000000000000000000000000000..9cf60202b8e7477be1f60535185f8c56602f87cb --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceOne.java @@ -0,0 +1,47 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.join; + +import java.util.Random; + +import eu.stratosphere.api.java.tuple.Tuple4; +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowJoinSourceOne extends UserSourceInvokable { + + private static final long serialVersionUID = 6670933703432267728L; + + private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", + "sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge", + "black", "peter" }; + private Random rand = new Random(); + private StreamRecord outRecord = new StreamRecord( + new Tuple4()); + private long progress = 0L; + + @Override + public void invoke() throws Exception { + while (true) { + outRecord.setString(0, "salary"); + outRecord.setString(1, names[rand.nextInt(names.length)]); + outRecord.setInteger(2, rand.nextInt(10000)); + outRecord.setLong(3, progress); + emit(outRecord); + progress+=1; + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java new file mode 100644 index 0000000000000000000000000000000000000000..6116e801b89dd97923da09d2eefffa56a504070c --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinSourceTwo.java @@ -0,0 +1,47 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.join; + +import java.util.Random; + +import eu.stratosphere.api.java.tuple.Tuple4; +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowJoinSourceTwo extends UserSourceInvokable { + + private static final long serialVersionUID = -5897483980082089771L; + + private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace", + "sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge", + "black", "peter" }; + private Random rand = new Random(); + private StreamRecord outRecord = new StreamRecord( + new Tuple4()); + private long progress = 0L; + + @Override + public void invoke() throws Exception { + while (true) { + outRecord.setString(0, "grade"); + outRecord.setString(1, names[rand.nextInt(names.length)]); + outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A'))); + outRecord.setLong(3, progress); + emit(outRecord); + progress+=1; + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java new file mode 100644 index 0000000000000000000000000000000000000000..a88148a7f57d27bef5f72ebe9651c12297b8e881 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/join/WindowJoinTask.java @@ -0,0 +1,115 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.window.join; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; + +import eu.stratosphere.api.java.tuple.Tuple3; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class WindowJoinTask extends UserTaskInvokable { + + class SalaryProgress { + public SalaryProgress(int salary, long progress) { + this.salary = salary; + this.progress = progress; + } + + int salary; + long progress; + } + + class GradeProgress { + public GradeProgress(String grade, long progress) { + this.grade = grade; + this.progress = progress; + } + + String grade; + long progress; + } + + private static final long serialVersionUID = 749913336259789039L; + private int windowSize = 100; + private HashMap> gradeHashmap; + private HashMap> salaryHashmap; + private StreamRecord outRecord = new StreamRecord(3); + + public WindowJoinTask() { + gradeHashmap = new HashMap>(); + salaryHashmap = new HashMap>(); + } + + @Override + public void invoke(StreamRecord record) throws Exception { + // TODO Auto-generated method stub + String streamId = record.getString(0); + String name = record.getString(1); + long progress = record.getLong(3); + if (streamId.equals("grade")) { + if (salaryHashmap.containsKey(name)) { + Iterator iterator = salaryHashmap.get(name) + .iterator(); + while (iterator.hasNext()) { + SalaryProgress entry = iterator.next(); + if (progress - entry.progress > windowSize) { + iterator.remove(); + } else { + Tuple3 outputTuple = new Tuple3( + name, record.getString(2), entry.salary); + outRecord.addTuple(outputTuple); + } + } + if (outRecord.getNumOfTuples() != 0) { + emit(outRecord); + } + outRecord.Clear(); + } + if (!gradeHashmap.containsKey(name)) { + gradeHashmap.put(name, new LinkedList()); + } + gradeHashmap.get(name).add( + new GradeProgress(record.getString(2), progress)); + } else { + if (gradeHashmap.containsKey(name)) { + Iterator iterator = gradeHashmap.get(name) + .iterator(); + while (iterator.hasNext()) { + GradeProgress entry = iterator.next(); + if (progress - entry.progress > windowSize) { + iterator.remove(); + } else { + Tuple3 outputTuple = new Tuple3( + name, entry.grade, record.getInteger(2)); + outRecord.addTuple(outputTuple); + } + } + if (outRecord.getNumOfTuples() != 0) { + emit(outRecord); + } + outRecord.Clear(); + } + if (!salaryHashmap.containsKey(name)) { + salaryHashmap.put(name, new LinkedList()); + } + salaryHashmap.get(name).add( + new SalaryProgress(record.getInteger(2), progress)); + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java index fe9e8dac8626e48bf7684a0ed3b8d3e1ab3c3b9f..5a7b55bfe92b9e4ffd71f6ef4af22aa443f7df26 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumAggregate.java @@ -19,7 +19,7 @@ import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.state.MutableTableState; -import eu.stratosphere.streaming.state.WindowState; +import eu.stratosphere.streaming.state.SlidingWindowState; public class WindowSumAggregate extends UserTaskInvokable { private static final long serialVersionUID = 1L; @@ -30,7 +30,7 @@ public class WindowSumAggregate extends UserTaskInvokable { private int windowFieldId = 1; private StreamRecord tempRecord; - private WindowState window; + private SlidingWindowState window; private MutableTableState sum; private long initTimestamp = -1; private long nextTimestamp = -1; @@ -39,7 +39,7 @@ public class WindowSumAggregate extends UserTaskInvokable { new Tuple2()); public WindowSumAggregate() { - window = new WindowState(windowSize, slidingStep, + window = new SlidingWindowState(windowSize, slidingStep, computeGranularity); sum = new MutableTableState(); sum.put("sum", 0); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java index 2df95b4f150cb8c3c45a6a17bc27782a24d1d8ce..020ae031aa619e2f343843a579a832b7ad60165e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/sum/WindowSumLocal.java @@ -26,7 +26,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraph; import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.util.LogUtils; -//TODO: window operator remains unfinished. public class WindowSumLocal { public static JobGraph getJobGraph() { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java index 4da15ce54dafe795cb762d5385e768a350a751d2..a2b8768553e5d11091735cdb4fb495f670cf4a64 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java @@ -21,7 +21,7 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.state.MutableTableState; import eu.stratosphere.streaming.state.MutableTableStateIterator; -import eu.stratosphere.streaming.state.WindowState; +import eu.stratosphere.streaming.state.SlidingWindowState; public class WindowWordCountCounter extends UserTaskInvokable { private static final long serialVersionUID = 1L; @@ -32,7 +32,7 @@ public class WindowWordCountCounter extends UserTaskInvokable { private int windowFieldId=2; private StreamRecord tempRecord; - private WindowState window; + private SlidingWindowState window; private MutableTableState wordCounts; private long initTimestamp=-1; private long nextTimestamp=-1; @@ -40,7 +40,7 @@ public class WindowWordCountCounter extends UserTaskInvokable { private StreamRecord outRecord = new StreamRecord(3); public WindowWordCountCounter() { - window = new WindowState(windowSize, slidingStep, + window = new SlidingWindowState(windowSize, slidingStep, computeGranularity); wordCounts = new MutableTableState(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java index 202119a3bdb6969fe2412a9d42abefbfac477592..c240c609494a2b3a63e0364d05627820e0e39cfc 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountLocal.java @@ -27,7 +27,6 @@ import eu.stratosphere.streaming.api.JobGraphBuilder; import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; import eu.stratosphere.streaming.util.LogUtils; -//TODO: window operator remains unfinished. public class WindowWordCountLocal { public static JobGraph getJobGraph() { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaSource.java new file mode 100644 index 0000000000000000000000000000000000000000..471f23b50df3fb01cfcb054b53a23d7e64d68af5 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaSource.java @@ -0,0 +1,87 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.kafka; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import kafka.consumer.ConsumerConfig; +import kafka.consumer.ConsumerIterator; +import kafka.consumer.KafkaStream; +import kafka.javaapi.consumer.ConsumerConnector; +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +/** + * Source for reading messages from a Kafka queue. The source currently only + * support string messages. Other types will be added soon. + * + */ +public class KafkaSource extends UserSourceInvokable { + private static final long serialVersionUID = 1L; + + private final String zkQuorum; + private final String groupId; + private final String topicId; + private final int numThreads; + private ConsumerConnector consumer; + + StreamRecord record = new StreamRecord(new Tuple1()); + + public KafkaSource(String zkQuorum, String groupId, String topicId, + int numThreads) { + this.zkQuorum = zkQuorum; + this.groupId = groupId; + this.topicId = topicId; + this.numThreads = numThreads; + } + + private void initializeConnection() { + Properties props = new Properties(); + props.put("zookeeper.connect", zkQuorum); + props.put("group.id", groupId); + props.put("zookeeper.session.timeout.ms", "400"); + props.put("zookeeper.sync.time.ms", "200"); + props.put("auto.commit.interval.ms", "1000"); + consumer = kafka.consumer.Consumer + .createJavaConsumerConnector(new ConsumerConfig(props)); + } + + @Override + public void invoke() { + initializeConnection(); + + Map topicCountMap = new HashMap(); + topicCountMap.put(topicId, numThreads); + Map>> consumerMap = consumer + .createMessageStreams(topicCountMap); + KafkaStream stream = consumerMap.get(topicId).get(0); + ConsumerIterator it = stream.iterator(); + + while (it.hasNext()) { + String message = new String(it.next().message()); + if (message.equals("q")) { + break; + } + record.setString(0, message); + emit(record); + } + + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaTopology.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaTopology.java new file mode 100644 index 0000000000000000000000000000000000000000..972a043beeef8b27048999e0421b6628e73ac4a2 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/kafka/KafkaTopology.java @@ -0,0 +1,56 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.kafka; + +import org.apache.log4j.Level; + +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.faulttolerance.FaultToleranceType; +import eu.stratosphere.streaming.util.ClusterUtil; +import eu.stratosphere.streaming.util.LogUtils; + +public class KafkaTopology { + + public static class Sink extends UserSinkInvokable { + private static final long serialVersionUID = 1L; + + @Override + public void invoke(StreamRecord record) throws Exception { + System.out.println(record.getString(0)); + } + } + + private static JobGraph getJobGraph() { + + JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE); + graphBuilder.setSource("Source", new KafkaSource("localhost:7077", "group", "topic", 1), 1, 1); + graphBuilder.setSink("Sink", new Sink(), 1, 1); + + graphBuilder.shuffleConnect("Source", "Sink"); + + return graphBuilder.getJobGraph(); + } + + public static void main(String[] args) { + + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + ClusterUtil.runOnMiniCluster(getJobGraph()); + + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowState.java similarity index 94% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowState.java index 0f01e24bf9f95ac15a0ccddaef6e68a883d4b3cc..39342a40a79eb245819da183d969c844abba7be2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowState.java @@ -25,14 +25,14 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; * compose time based window operator by extending this class by splitting the * stream into multiple mini batches. */ -public class WindowState { +public class SlidingWindowState { private int currentRecordCount; private int fullRecordCount; private int slideRecordCount; CircularFifoBuffer buffer; - public WindowState(int windowSize, int slidingStep, int computeGranularity) { + public SlidingWindowState(int windowSize, int slidingStep, int computeGranularity) { this.currentRecordCount = 0; // here we assume that windowSize and slidingStep is divisible by // computeGranularity. diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowStateIterator.java similarity index 96% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowStateIterator.java index b43a17a992f78ad32c60828cd318427826e46db7..28bfb3d77323b18211804411a21726841cf5db17 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/SlidingWindowStateIterator.java @@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state; import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -public class WindowStateIterator{ +public class SlidingWindowStateIterator{ public boolean hasNext() { // TODO Auto-generated method stub diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java index 0fd88cc4772c89b9b9d9ea6d70fe335cdecae84f..b7ae44d6d8f2f22645ed3d90d012dfcff65036cd 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java @@ -21,7 +21,7 @@ import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.state.LogTableState; import eu.stratosphere.streaming.state.MutableTableState; import eu.stratosphere.streaming.state.TableStateIterator; -import eu.stratosphere.streaming.state.WindowState; +import eu.stratosphere.streaming.state.SlidingWindowState; public class InternalStateTest { @@ -83,7 +83,7 @@ public class InternalStateTest { @Test public void WindowStateTest(){ - WindowState state=new WindowState(100, 20, 10); + SlidingWindowState state=new SlidingWindowState(100, 20, 10); } }