提交 ca1e41da 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] fix several bugs in in the examples. Window operator runnable

上级 4085a274
......@@ -137,7 +137,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this.batchSize = batchSize;
tupleBatch = new ArrayList<Tuple>(batchSize);
tupleBatch.add(tuple);
}
/**
......@@ -939,35 +938,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
throw new TupleSizeMismatchException();
}
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the shadow copy of Tuple to the end of the batch
*
* @param tuple
* Tuple to be added as the next record of the batch
*/
public void addShadowTuple(Tuple tuple) throws TupleSizeMismatchException {
addShadowTuple(numOfTuples, tuple);
}
/**
* Checks if the number of fields are equal to the batch field size then
* inserts the shadow copy of Tuple to the given position into the recordbatch
*
* @param index
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
*/
public void addShadowTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
tupleBatch.add(index, tuple);
numOfTuples++;
} else {
throw new TupleSizeMismatchException();
}
}
/**
* Creates a copy of the StreamRecord object by Serializing and
......
......@@ -15,38 +15,45 @@
package eu.stratosphere.streaming.examples.batch.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(3);
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
timestamp = record.getLong(1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
}
\ No newline at end of file
......@@ -26,12 +26,14 @@ public class BatchWordCountSink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
count = record.getInteger(1);
timestamp = record.getLong(2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
......@@ -28,9 +28,6 @@ public class BatchWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private final static int BATCH_SIZE = 20;
private Long timestamp = 0L;
public BatchWordCountSource() {
......@@ -39,29 +36,24 @@ public class BatchWordCountSource extends UserSourceInvokable {
} catch (FileNotFoundException e) {
e.printStackTrace();
}
timestamp = 0L;
}
@Override
public void invoke() throws Exception {
timestamp = 0L;
outRecord = new StreamRecord(2);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
for(int i=0; i<100; ++i) {
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
outRecord.addTuple(new Tuple2<String, Long>(line, timestamp));
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
System.out.println("line="+line);
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
timestamp++;
if (timestamp % BATCH_SIZE == 0) {
emit(outRecord);
outRecord = new StreamRecord(2);
}
emit(outRecord);
}
line = br.readLine();
}
}
}
\ No newline at end of file
}
......@@ -15,31 +15,26 @@
package eu.stratosphere.streaming.examples.batch.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple2<String,Long>());
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp =0L;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfTuples();
for (int i = 0; i < numberOfRecords; ++i) {
words = record.getString(0).split(" ");
timestamp=record.getLong(1);
for (String word : words) {
outputRecord.setString(0, word);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
}
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("sentence=" + record.getString(0) + ", timestamp="
+ record.getLong(1));
for (String word : words) {
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
}
emit(outputRecord);
}
}
\ No newline at end of file
......@@ -15,10 +15,12 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
......@@ -34,36 +36,42 @@ public class WindowWordCountCounter extends UserTaskInvokable {
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
computeGranularity = 10;
windowFieldId = 2;
window = new WindowState<Integer>(windowSize, slidingStep, computeGranularity, windowFieldId);
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity, windowFieldId);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
}
}
private void decrementCompute(StreamRecord record) {
word = record.getString(0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
}
}
}
......@@ -75,18 +83,28 @@ public class WindowWordCountCounter extends UserTaskInvokable {
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if(window.isFull()){
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
if (window.isFull()) {
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
}
......
......@@ -26,12 +26,14 @@ public class WindowWordCountSink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
count = record.getInteger(1);
timestamp = record.getLong(2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
......@@ -26,10 +26,9 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = new String();
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private Long timestamp;
private Long timestamp = 0L;
public WindowWordCountSource() {
try {
......@@ -37,20 +36,24 @@ public class WindowWordCountSource extends UserSourceInvokable {
} catch (FileNotFoundException e) {
e.printStackTrace();
}
timestamp = 0L;
}
@Override
public void invoke() throws Exception {
timestamp = 0L;
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
for(int i=0; i<10; ++i) {
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
System.out.println("line="+line);
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
timestamp++;
emit(outRecord);
}
line = br.readLine();
timestamp++;
}
}
}
......@@ -15,14 +15,13 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple2<String, Long>());
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp = 0L;
......@@ -30,13 +29,12 @@ public class WindowWordCountSplitter extends UserTaskInvokable {
public void invoke(StreamRecord record) throws Exception {
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("************sentence=" + words + ", timestamp=" + timestamp
+ "************");
System.out.println("sentence=" + record.getString(0) + ", timestamp="
+ record.getLong(1));
for (String word : words) {
outputRecord.setString(0, word);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
}
emit(outputRecord);
}
}
\ No newline at end of file
......@@ -19,7 +19,6 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountCounter extends UserTaskInvokable {
......
......@@ -28,16 +28,11 @@ import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
counterSubtasksPerInstance);
graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks,
sinkSubtasksPerInstance);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
......@@ -45,71 +40,44 @@ public class WordCountLocal {
return graphBuilder.getJobGraph();
}
private static void wrongArgs() {
System.out
.println("USAGE:\n"
+ "run <local/cluster> <SOURCE num of subtasks> <SOURCE subtasks per instance> <SPLITTER num of subtasks> <SPLITTER subtasks per instance> <COUNTER num of subtasks> <COUNTER subtasks per instance> <SINK num of subtasks> <SINK subtasks per instance>");
}
// TODO: arguments check
public static void main(String[] args) {
if (args.length != 7) {
wrongArgs();
} else {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
int sourceSubtasks = 1;
int sourceSubtasksPerInstance = 1;
int counterSubtasks = 1;
int counterSubtasksPerInstance = 1;
int sinkSubtasks = 1;
int sinkSubtasksPerInstance = 1;
try {
sourceSubtasks = Integer.parseInt(args[1]);
sourceSubtasksPerInstance = Integer.parseInt(args[2]);
counterSubtasks = Integer.parseInt(args[3]);
counterSubtasksPerInstance = Integer.parseInt(args[4]);
sinkSubtasks = Integer.parseInt(args[5]);
sinkSubtasksPerInstance = Integer.parseInt(args[6]);
} catch (Exception e) {
wrongArgs();
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
try {
JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance,
counterSubtasks, counterSubtasksPerInstance, sinkSubtasks,
sinkSubtasksPerInstance);
Configuration configuration = jG.getJobConfiguration();
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
if (args.length == 0) {
args = new String[] { "local" };
}
exec.start();
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
exec.start();
client.run(jG, true);
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
exec.stop();
client.run(jG, true);
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
Client client = new Client(new InetSocketAddress("dell150", 6123),
configuration);
client.run(jG, true);
}
client.run(jG, true);
} catch (Exception e) {
System.out.println(e);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
......@@ -16,7 +16,6 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
......@@ -31,27 +30,20 @@ public class WordCountSource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 2; i++) {
try {
br = new BufferedReader(new FileReader(
"/home/strato/stratosphere-distrib/resources/hamlet.txt"));
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
outRecord.setString(0, line);
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
outRecord.setString(0, line);
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
\ No newline at end of file
......@@ -16,7 +16,6 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
......@@ -31,28 +30,22 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
while (true) {
try {
br = new BufferedReader(new FileReader(
"/home/strato/stratosphere-distrib/resources/hamlet.txt"));
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
for (String word : line.split(" ")) {
outRecord.setString(0, word);
emit(outRecord);
performanceCounter.count();
}
}
line = br.readLine();
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
for (String word : line.split(" ")) {
outRecord.setString(0, word);
System.out.println("word=" + word);
emit(outRecord);
performanceCounter.count();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
counterSubtasksPerInstance);
graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks,
sinkSubtasksPerInstance);
graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
private static void wrongArgs() {
System.out
.println("USAGE:\n"
+ "run <local/cluster> <SOURCE num of subtasks> <SOURCE subtasks per instance> <SPLITTER num of subtasks> <SPLITTER subtasks per instance> <COUNTER num of subtasks> <COUNTER subtasks per instance> <SINK num of subtasks> <SINK subtasks per instance>");
}
// TODO: arguments check
public static void main(String[] args) {
if (args.length != 7) {
wrongArgs();
} else {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
int sourceSubtasks = 1;
int sourceSubtasksPerInstance = 1;
int counterSubtasks = 1;
int counterSubtasksPerInstance = 1;
int sinkSubtasks = 1;
int sinkSubtasksPerInstance = 1;
try {
sourceSubtasks = Integer.parseInt(args[1]);
sourceSubtasksPerInstance = Integer.parseInt(args[2]);
counterSubtasks = Integer.parseInt(args[3]);
counterSubtasksPerInstance = Integer.parseInt(args[4]);
sinkSubtasks = Integer.parseInt(args[5]);
sinkSubtasksPerInstance = Integer.parseInt(args[6]);
} catch (Exception e) {
wrongArgs();
}
try {
JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance,
counterSubtasks, counterSubtasksPerInstance, sinkSubtasks,
sinkSubtasksPerInstance);
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
Client client = new Client(new InetSocketAddress("dell150", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
}
......@@ -49,7 +49,7 @@ public class MutableTableState<K, V> implements TableState<K, V> {
}
@Override
public TableStateIterator<K, V> getIterator() {
public MutableTableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableTableStateIterator<K, V>(state.entrySet().iterator());
}
......
......@@ -65,7 +65,7 @@ public class WindowState<K> {
public void pushBack(StreamRecord record) {
if (initTimestamp == -1) {
initTimestamp = record.getTuple(0).getField(windowFieldId);
initTimestamp = (Integer) record.getTuple(0).getField(windowFieldId);
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
......@@ -75,7 +75,7 @@ public class WindowState<K> {
currentRecordCount += 1;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addShadowTuple(record.getTuple(i));
tempRecord.addTuple(record.getTuple(i));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册