提交 900c8d8e 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] incremental machine learning skeleton added

上级 08d2ac62
......@@ -72,6 +72,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private UID uid = new UID();
private int numOfFields;
private int numOfTuples;
private int batchSize;
private static final Class<?>[] CLASSES = new Class<?>[] { Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class,
Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class,
......@@ -95,6 +96,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord(int numOfFields, int batchSize) {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
this.batchSize = batchSize;
tupleBatch = new ArrayList<Tuple>(batchSize);
}
......@@ -111,6 +113,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public StreamRecord(Tuple tuple, int batchSize) {
numOfFields = tuple.getArity();
numOfTuples = 1;
this.batchSize = batchSize;
tupleBatch = new ArrayList<Tuple>(batchSize);
tupleBatch.add(tuple);
......@@ -159,6 +162,14 @@ public class StreamRecord implements IOReadableWritable, Serializable {
uid = new UID(channelID);
return this;
}
public void InitRecords(){
tupleBatch.clear();
for(int i=0;i<batchSize;i++){
tupleBatch.add(null);
}
numOfTuples = batchSize;
}
/**
* Returns the value of a field in the given position of the first tuple in
......
package eu.stratosphere.streaming.examples.ml;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils;
public class IncrementalLearning {
public static class NewDataSource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<Integer>(1));
@Override
public void invoke() throws Exception {
while (true) {
// pull new record from data source
record.setTuple(getNewData());
emit(record);
}
}
private Tuple getNewData() throws InterruptedException {
Thread.sleep(1000);
return new Tuple1<Integer>(1);
}
}
public static class OldDataSource extends UserSourceInvokable {
private final int BATCH_SIZE = 1000;
StreamRecord record = new StreamRecord(2, BATCH_SIZE);
@Override
public void invoke() throws Exception {
record.InitRecords();
while (true) {
for (int i = 0; i < BATCH_SIZE; i++) {
record.setTuple(getOldData());
}
emit(record);
}
}
private Tuple getOldData() throws InterruptedException {
Thread.sleep(1000);
return new Tuple1<Integer>(1);
}
}
public static class WindowPredictor extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
//build predictor on record
}
}
public static class Sink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
// do nothing
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("IncrementalLearning");
return graphBuilder.getJobGraph();
}
public static void main(String[] args) {
// set logging parameters for local run
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
try {
// generate JobGraph
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0 || args[0].equals("local")) {
System.out.println("Running in Local mode");
// start local cluster and submit JobGraph
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
// submit JobGraph to the running cluster
Client client = new Client(new InetSocketAddress("dell150", 6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册