提交 26fcfeb5 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] add kafka topology

上级 ee314940
......@@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>eu.stratosphere</groupId>
<version>0.5</version>
<version>0.5-rc2</version>
<artifactId>stratosphere-streaming</artifactId>
<name>stratosphere-streaming</name>
......@@ -83,6 +83,11 @@
<artifactId>amqp-client</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
</dependencies>
<build>
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.api.streamcomponent;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class StreamWindowTask extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
......@@ -27,7 +27,7 @@ public class StreamWindowTask extends UserTaskInvokable {
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
......@@ -36,7 +36,7 @@ public class StreamWindowTask extends UserTaskInvokable {
int computeGranularity, int windowFieldId) {
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
window = new WindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class IterativeLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("IterativeSource", IterativeSource.class);
graphBuilder.setTask("IterativeParallel", IterativeParallel.class, 1, 1);
graphBuilder.setTask("IterativeStateHolder", IterativeStateHolder.class);
graphBuilder.setSink("IterativeSink", IterativeSink.class);
graphBuilder.fieldsConnect("IterativeSource", "IterativeParallel", 1);
graphBuilder.fieldsConnect("IterativeParallel", "IterativeStateHolder", 1);
graphBuilder.globalConnect("IterativeStateHolder", "IterativeSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class IterativeParallel extends UserTaskInvokable {
private static final long serialVersionUID = -3042489460184024483L;
public IterativeParallel() {
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class IterativeSink extends UserSinkInvokable {
private static final long serialVersionUID = -1989637817643875304L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
public class IterativeSource extends UserSourceInvokable {
private static final long serialVersionUID = 8983174839600079890L;
@Override
public void invoke() throws Exception {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class IterativeStateHolder extends UserTaskInvokable {
private static final long serialVersionUID = -3042489460184024483L;
public IterativeStateHolder() {
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.collaborativefilter;
public class CollaborativeFilteringLocal {
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.kmeans;
public class KMeansLocal {
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.pagerank;
public class PagerankLocal {
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.iterative.sssp;
public class SSSPLocal {
public static void main(String[] args) {
// TODO Auto-generated method stub
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class JoinLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("JoinSourceOne", JoinSourceOne.class);
graphBuilder.setSource("JoinSourceTwo", JoinSourceTwo.class);
graphBuilder.setTask("JoinTask", JoinTask.class, 1, 1);
graphBuilder.setSink("JoinSink", JoinSink.class);
graphBuilder.fieldsConnect("JoinSourceOne", "JoinTask", 1);
graphBuilder.fieldsConnect("JoinSourceTwo", "JoinTask", 1);
graphBuilder.shuffleConnect("JoinTask", "JoinSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinSourceOne extends UserSourceInvokable {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, String, Integer>());
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "salary");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setInteger(2, rand.nextInt(10000));
emit(outRecord);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinSourceTwo extends UserSourceInvokable {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, String, String>());
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "grade");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
emit(outRecord);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.join;
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class JoinTask extends UserTaskInvokable {
private static final long serialVersionUID = 749913336259789039L;
private HashMap<String, ArrayList<String>> gradeHashmap;
private HashMap<String, ArrayList<Integer>> salaryHashmap;
private StreamRecord outRecord = new StreamRecord(3);
public JoinTask() {
gradeHashmap = new HashMap<String, ArrayList<String>>();
salaryHashmap = new HashMap<String, ArrayList<Integer>>();
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String streamId = record.getString(0);
String name = record.getString(1);
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
for (Integer salary : salaryHashmap.get(name)) {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, record.getString(2), salary);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
outRecord.Clear();
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new ArrayList<String>());
}
gradeHashmap.get(name).add(record.getString(2));
} else {
if (gradeHashmap.containsKey(name)) {
for (String grade : gradeHashmap.get(name)) {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, grade, record.getInteger(2));
outRecord.addTuple(outputTuple);
}
emit(outRecord);
outRecord.Clear();
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new ArrayList<Integer>());
}
salaryHashmap.get(name).add(record.getInteger(2));
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
public class WindowJoinLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph", FaultToleranceType.NONE);
graphBuilder.setSource("WindowJoinSourceOne", WindowJoinSourceOne.class);
graphBuilder.setSource("WindowJoinSourceTwo", WindowJoinSourceTwo.class);
graphBuilder.setTask("WindowJoinTask", WindowJoinTask.class, 1, 1);
graphBuilder.setSink("WindowJoinSink", WindowJoinSink.class);
graphBuilder.fieldsConnect("WindowJoinSourceOne", "WindowJoinTask", 1);
graphBuilder.fieldsConnect("WindowJoinSourceTwo", "WindowJoinTask", 1);
graphBuilder.shuffleConnect("WindowJoinTask", "WindowJoinSink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println("received record...");
int tupleNum = record.getNumOfTuples();
System.out.println("============================================");
for (int i = 0; i < tupleNum; ++i) {
System.out.println("name=" + record.getField(i, 0) + ", grade="
+ record.getField(i, 1) + ", salary="
+ record.getField(i, 2));
}
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSourceOne extends UserSourceInvokable {
private static final long serialVersionUID = 6670933703432267728L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple4<String, String, Integer, Long>());
private long progress = 0L;
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "salary");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setInteger(2, rand.nextInt(10000));
outRecord.setLong(3, progress);
emit(outRecord);
progress+=1;
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.util.Random;
import eu.stratosphere.api.java.tuple.Tuple4;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinSourceTwo extends UserSourceInvokable {
private static final long serialVersionUID = -5897483980082089771L;
private String[] names = { "tom", "jerry", "alice", "bob", "john", "grace",
"sasa", "lawrance", "andrew", "jean", "richard", "smith", "gorge",
"black", "peter" };
private Random rand = new Random();
private StreamRecord outRecord = new StreamRecord(
new Tuple4<String, String, String, Long>());
private long progress = 0L;
@Override
public void invoke() throws Exception {
while (true) {
outRecord.setString(0, "grade");
outRecord.setString(1, names[rand.nextInt(names.length)]);
outRecord.setString(2, String.valueOf((char)(rand.nextInt(26)+'A')));
outRecord.setLong(3, progress);
emit(outRecord);
progress+=1;
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.join;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowJoinTask extends UserTaskInvokable {
class SalaryProgress {
public SalaryProgress(int salary, long progress) {
this.salary = salary;
this.progress = progress;
}
int salary;
long progress;
}
class GradeProgress {
public GradeProgress(String grade, long progress) {
this.grade = grade;
this.progress = progress;
}
String grade;
long progress;
}
private static final long serialVersionUID = 749913336259789039L;
private int windowSize = 100;
private HashMap<String, LinkedList<GradeProgress>> gradeHashmap;
private HashMap<String, LinkedList<SalaryProgress>> salaryHashmap;
private StreamRecord outRecord = new StreamRecord(3);
public WindowJoinTask() {
gradeHashmap = new HashMap<String, LinkedList<GradeProgress>>();
salaryHashmap = new HashMap<String, LinkedList<SalaryProgress>>();
}
@Override
public void invoke(StreamRecord record) throws Exception {
// TODO Auto-generated method stub
String streamId = record.getString(0);
String name = record.getString(1);
long progress = record.getLong(3);
if (streamId.equals("grade")) {
if (salaryHashmap.containsKey(name)) {
Iterator<SalaryProgress> iterator = salaryHashmap.get(name)
.iterator();
while (iterator.hasNext()) {
SalaryProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, record.getString(2), entry.salary);
outRecord.addTuple(outputTuple);
}
}
if (outRecord.getNumOfTuples() != 0) {
emit(outRecord);
}
outRecord.Clear();
}
if (!gradeHashmap.containsKey(name)) {
gradeHashmap.put(name, new LinkedList<GradeProgress>());
}
gradeHashmap.get(name).add(
new GradeProgress(record.getString(2), progress));
} else {
if (gradeHashmap.containsKey(name)) {
Iterator<GradeProgress> iterator = gradeHashmap.get(name)
.iterator();
while (iterator.hasNext()) {
GradeProgress entry = iterator.next();
if (progress - entry.progress > windowSize) {
iterator.remove();
} else {
Tuple3<String, String, Integer> outputTuple = new Tuple3<String, String, Integer>(
name, entry.grade, record.getInteger(2));
outRecord.addTuple(outputTuple);
}
}
if (outRecord.getNumOfTuples() != 0) {
emit(outRecord);
}
outRecord.Clear();
}
if (!salaryHashmap.containsKey(name)) {
salaryHashmap.put(name, new LinkedList<SalaryProgress>());
}
salaryHashmap.get(name).add(
new SalaryProgress(record.getInteger(2), progress));
}
}
}
......@@ -19,7 +19,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class WindowSumAggregate extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
......@@ -30,7 +30,7 @@ public class WindowSumAggregate extends UserTaskInvokable {
private int windowFieldId = 1;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> sum;
private long initTimestamp = -1;
private long nextTimestamp = -1;
......@@ -39,7 +39,7 @@ public class WindowSumAggregate extends UserTaskInvokable {
new Tuple2<Integer, Long>());
public WindowSumAggregate() {
window = new WindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
computeGranularity);
sum = new MutableTableState<String, Integer>();
sum.put("sum", 0);
......
......@@ -26,7 +26,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowSumLocal {
public static JobGraph getJobGraph() {
......
......@@ -21,7 +21,7 @@ import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
private static final long serialVersionUID = 1L;
......@@ -32,7 +32,7 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private int windowFieldId=2;
private StreamRecord tempRecord;
private WindowState<Integer> window;
private SlidingWindowState<Integer> window;
private MutableTableState<String, Integer> wordCounts;
private long initTimestamp=-1;
private long nextTimestamp=-1;
......@@ -40,7 +40,7 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
window = new WindowState<Integer>(windowSize, slidingStep,
window = new SlidingWindowState<Integer>(windowSize, slidingStep,
computeGranularity);
wordCounts = new MutableTableState<String, Integer>();
}
......
......@@ -27,7 +27,6 @@ import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.LogUtils;
//TODO: window operator remains unfinished.
public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* Source for reading messages from a Kafka queue. The source currently only
* support string messages. Other types will be added soon.
*
*/
public class KafkaSource extends UserSourceInvokable {
private static final long serialVersionUID = 1L;
private final String zkQuorum;
private final String groupId;
private final String topicId;
private final int numThreads;
private ConsumerConnector consumer;
StreamRecord record = new StreamRecord(new Tuple1<String>());
public KafkaSource(String zkQuorum, String groupId, String topicId,
int numThreads) {
this.zkQuorum = zkQuorum;
this.groupId = groupId;
this.topicId = topicId;
this.numThreads = numThreads;
}
private void initializeConnection() {
Properties props = new Properties();
props.put("zookeeper.connect", zkQuorum);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(new ConsumerConfig(props));
}
@Override
public void invoke() {
initializeConnection();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topicId, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topicId).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String message = new String(it.next().message());
if (message.equals("q")) {
break;
}
record.setString(0, message);
emit(record);
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.kafka;
import org.apache.log4j.Level;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceType;
import eu.stratosphere.streaming.util.ClusterUtil;
import eu.stratosphere.streaming.util.LogUtils;
public class KafkaTopology {
public static class Sink extends UserSinkInvokable {
private static final long serialVersionUID = 1L;
@Override
public void invoke(StreamRecord record) throws Exception {
System.out.println(record.getString(0));
}
}
private static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("RMQ", FaultToleranceType.NONE);
graphBuilder.setSource("Source", new KafkaSource("localhost:7077", "group", "topic", 1), 1, 1);
graphBuilder.setSink("Sink", new Sink(), 1, 1);
graphBuilder.shuffleConnect("Source", "Sink");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
ClusterUtil.runOnMiniCluster(getJobGraph());
}
}
\ No newline at end of file
......@@ -25,14 +25,14 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowState<K> {
public class SlidingWindowState<K> {
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
CircularFifoBuffer buffer;
public WindowState(int windowSize, int slidingStep, int computeGranularity) {
public SlidingWindowState(int windowSize, int slidingStep, int computeGranularity) {
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
// computeGranularity.
......
......@@ -18,7 +18,7 @@ package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowStateIterator<K>{
public class SlidingWindowStateIterator<K>{
public boolean hasNext() {
// TODO Auto-generated method stub
......
......@@ -21,7 +21,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.LogTableState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
import eu.stratosphere.streaming.state.SlidingWindowState;
public class InternalStateTest {
......@@ -83,7 +83,7 @@ public class InternalStateTest {
@Test
public void WindowStateTest(){
WindowState<String> state=new WindowState<String>(100, 20, 10);
SlidingWindowState<String> state=new SlidingWindowState<String>(100, 20, 10);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册