提交 836cfa80 编写于 作者: S ssc

polishing iteration examples

上级 552aae59
......@@ -9,3 +9,5 @@ target
filter.properties
tmp/*
pact/pact-tests/tmp/*
*.iml
.idea/*
......@@ -24,10 +24,15 @@ import eu.stratosphere.nephele.io.compression.CompressionLevel;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.common.io.FileInputFormat;
import eu.stratosphere.pact.runtime.iterative.io.FakeOutputTask;
import eu.stratosphere.pact.runtime.iterative.playing.pagerank.PageWithRankInputFormat;
import eu.stratosphere.pact.runtime.task.DataSinkTask;
import eu.stratosphere.pact.runtime.task.DataSourceTask;
import eu.stratosphere.pact.runtime.task.RegularPactTask;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
......@@ -45,15 +50,28 @@ public class JobGraphUtils {
client.submitJobAndWait();
}
public static void connectLocal(AbstractJobVertex source, AbstractJobVertex target, TaskConfig sourceConfig)
public static JobInputVertex createInput(Class<?> stubClass, String path, String name, JobGraph graph,
int degreeOfParallelism) {
JobInputVertex inputVertex = new JobInputVertex(name, graph);
Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
inputVertex.setInputClass(clazz);
inputVertex.setNumberOfSubtasks(degreeOfParallelism);
inputVertex.setNumberOfSubtasksPerInstance(degreeOfParallelism);
TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
inputConfig.setStubClass(stubClass);
inputConfig.setStubParameter(FileInputFormat.FILE_PARAMETER_KEY, path);
return inputVertex;
}
public static void connectLocal(AbstractJobVertex source, AbstractJobVertex target)
throws JobGraphDefinitionException {
connectLocal(source, target, sourceConfig, DistributionPattern.POINTWISE, ShipStrategy.FORWARD);
connectLocal(source, target, DistributionPattern.POINTWISE, ShipStrategy.FORWARD);
}
public static void connectLocal(AbstractJobVertex source, AbstractJobVertex target, TaskConfig sourceConfig,
public static void connectLocal(AbstractJobVertex source, AbstractJobVertex target,
DistributionPattern distributionPattern, ShipStrategy shipStrategy) throws JobGraphDefinitionException {
source.connectTo(target, ChannelType.INMEMORY, CompressionLevel.NO_COMPRESSION, distributionPattern);
sourceConfig.addOutputShipStrategy(shipStrategy);
new TaskConfig(source.getConfiguration()).addOutputShipStrategy(shipStrategy);
}
public static JobTaskVertex createTask(Class<? extends RegularPactTask> task, String name, JobGraph graph, int dop) {
......@@ -90,6 +108,7 @@ public class JobGraphUtils {
JobOutputVertex sinkVertex = new JobOutputVertex(name, jobGraph);
sinkVertex.setOutputClass(DataSinkTask.class);
sinkVertex.setNumberOfSubtasks(degreeOfParallelism);
sinkVertex.setNumberOfSubtasksPerInstance(degreeOfParallelism);
return sinkVertex;
}
}
......@@ -45,15 +45,9 @@ public class IterativeMapReduce {
int degreeOfParallelism = 2;
JobGraph jobGraph = new JobGraph();
JobInputVertex input = new JobInputVertex("FileInput", jobGraph);
Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
input.setInputClass(clazz);
input.setNumberOfSubtasks(degreeOfParallelism);
input.setNumberOfSubtasksPerInstance(degreeOfParallelism);
TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
inputConfig.setStubClass(TokenTokenInputFormat.class);
inputConfig.setLocalStrategy(TaskConfig.LocalStrategy.NONE);
inputConfig.setStubParameter(FileInputFormat.FILE_PARAMETER_KEY, "file:///home/ssc/Desktop/iterative-mapreduce/");
JobInputVertex input = JobGraphUtils.createInput(TokenTokenInputFormat.class,
"file:///home/ssc/Desktop/stratosphere/test-inputs/iterative-mapreduce/", "FileInput", jobGraph,
degreeOfParallelism);
JobTaskVertex head = JobGraphUtils.createTask(BulkIterationHeadPactTask.class, "BulkIterationHead", jobGraph,
degreeOfParallelism);
......@@ -94,18 +88,17 @@ public class IterativeMapReduce {
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
JobOutputVertex fakeSyncOutput = JobGraphUtils.createSingletonFakeOutput(jobGraph, "FakeSyncOutput");
JobGraphUtils.connectLocal(input, head, inputConfig);
JobGraphUtils.connectLocal(input, head);
//TODO implicit order should be documented/configured somehow
JobGraphUtils.connectLocal(head, tail, headConfig, DistributionPattern.BIPARTITE, ShipStrategy.PARTITION_HASH);
JobGraphUtils.connectLocal(head, sync, headConfig);
JobGraphUtils.connectLocal(head, output, headConfig);
JobGraphUtils.connectLocal(tail, fakeTailOutput, tailConfig);
JobGraphUtils.connectLocal(sync, fakeSyncOutput, syncConfig);
JobGraphUtils.connectLocal(head, tail, DistributionPattern.BIPARTITE, ShipStrategy.PARTITION_HASH);
JobGraphUtils.connectLocal(head, sync);
JobGraphUtils.connectLocal(head, output);
JobGraphUtils.connectLocal(tail, fakeTailOutput);
JobGraphUtils.connectLocal(sync, fakeSyncOutput);
head.setVertexToShareInstancesWith(tail);
GlobalConfiguration.loadConfiguration(
"/home/ssc/Entwicklung/projects/stratosphere-iterations/stratosphere-dist/src/main/stratosphere-bin/conf");
GlobalConfiguration.loadConfiguration("/home/ssc/Desktop/stratosphere/local-conf");
Configuration conf = GlobalConfiguration.getConfiguration();
JobGraphUtils.submit(jobGraph, conf);
......
......@@ -8,8 +8,6 @@ import eu.stratosphere.pact.common.type.base.PactLong;
public class DotProductMatch extends MatchStub {
private final PactRecord record = new PactRecord();
@Override
public void match(PactRecord pageWithRank, PactRecord transitionMatrixEntry, Collector<PactRecord> collector)
throws Exception {
......@@ -18,9 +16,13 @@ public class DotProductMatch extends MatchStub {
double rank = pageWithRank.getField(1, PactDouble.class).getValue();
double transitionProbability = transitionMatrixEntry.getField(2, PactDouble.class).getValue();
PactRecord record = new PactRecord();
record.setField(0, new PactLong(vertexID));
record.setField(1, new PactDouble(rank * transitionProbability));
long source = transitionMatrixEntry.getField(0, PactLong.class).getValue();
System.out.println("Match from " + source + " to " + vertexID + ": " + rank + " * " + transitionProbability + " = " + (rank * transitionProbability));
collector.collect(record);
}
}
......@@ -5,6 +5,7 @@ import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.common.stubs.ReduceStub;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactDouble;
import eu.stratosphere.pact.common.type.base.PactLong;
import java.util.Iterator;
......@@ -13,13 +14,20 @@ public class DotProductReducer extends ReduceStub {
@Override
public void reduce(Iterator<PactRecord> records, Collector<PactRecord> collector) throws Exception {
PactRecord accumulator = records.next();
PactDouble rank = accumulator.getField(1, PactDouble.class);
double sum = rank.getValue();
PactRecord accumulator = new PactRecord();
double sum = 0;
while (records.hasNext()) {
sum += records.next().getField(1, PactDouble.class).getValue();
PactRecord record = records.next();
accumulator.setField(0, record.getField(0, PactLong.class));
sum += record.getField(1, PactDouble.class).getValue();
System.out.println("\t" + record.getField(0, PactLong.class) + " " + record.getField(1, PactDouble.class));
}
rank.setValue(sum);
accumulator.setField(1, rank);
accumulator.setField(1, new PactDouble(sum));
System.out.println("Reduce: " + accumulator.getField(0, PactLong.class) + " " + sum);
collector.collect(accumulator);
}
}
......@@ -3,6 +3,7 @@ package eu.stratosphere.pact.runtime.iterative.playing.pagerank;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
......@@ -11,9 +12,7 @@ import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.common.io.FileInputFormat;
import eu.stratosphere.pact.common.io.FileOutputFormat;
import eu.stratosphere.pact.common.type.base.PactLong;
import eu.stratosphere.pact.common.type.base.PactString;
import eu.stratosphere.pact.runtime.iterative.playing.JobGraphUtils;
import eu.stratosphere.pact.runtime.iterative.playing.iterativemapreduce.AppendTokenOutFormat;
import eu.stratosphere.pact.runtime.iterative.task.BulkIterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.BulkIterationSynchronizationPactTask;
import eu.stratosphere.pact.runtime.iterative.task.BulkIterationTailPactTask;
......@@ -31,27 +30,19 @@ public class PageRank {
public static void main(String[] args) throws Exception {
int degreeOfParallelism = 1;
JobGraph jobGraph = new JobGraph();
JobGraph jobGraph = new JobGraph("PageRank");
JobInputVertex pageWithRankInput = new JobInputVertex("PageWithRankInput", jobGraph);
Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
pageWithRankInput.setInputClass(clazz);
pageWithRankInput.setNumberOfSubtasks(degreeOfParallelism);
pageWithRankInput.setNumberOfSubtasksPerInstance(degreeOfParallelism);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.setStubClass(PageWithRankInputFormat.class);
pageWithRankInputConfig.setLocalStrategy(TaskConfig.LocalStrategy.NONE);
pageWithRankInputConfig.setStubParameter(FileInputFormat.FILE_PARAMETER_KEY,
"file:///home/ssc/Desktop/iterative-mapreduce/");
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(PageWithRankInputFormat.class,
"file:///home/ssc/Desktop/stratosphere/test-inputs/pagerank/pageWithRank", "PageWithRankInput", jobGraph,
degreeOfParallelism);
JobInputVertex transitionMatrixInput = new JobInputVertex("TransitionMatrixInput", jobGraph);
transitionMatrixInput.setNumberOfSubtasks(degreeOfParallelism);
transitionMatrixInput.setNumberOfSubtasksPerInstance(degreeOfParallelism);
JobInputVertex transitionMatrixInput = JobGraphUtils.createInput(TransitionMatrixInputFormat.class,
"file:///home/ssc/Desktop/stratosphere/test-inputs/pagerank/transitionMatrix", "TransitionMatrixInput",
jobGraph, degreeOfParallelism);
TaskConfig transitionMatrixInputConfig = new TaskConfig(transitionMatrixInput.getConfiguration());
transitionMatrixInputConfig.setStubClass(TransitionMatrixInputFormat.class);
transitionMatrixInputConfig.setLocalStrategy(TaskConfig.LocalStrategy.NONE);
transitionMatrixInputConfig.setStubParameter(FileInputFormat.FILE_PARAMETER_KEY,
"file:///home/ssc/Desktop/iterative-mapreduce/");
transitionMatrixInputConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(transitionMatrixInput.getConfiguration(),
"pact.out.param.0.", new int[] { 1 }, new Class[] { PactLong.class });
JobTaskVertex head = JobGraphUtils.createTask(BulkIterationHeadPactTask.class, "BulkIterationHead", jobGraph,
degreeOfParallelism);
......@@ -63,9 +54,9 @@ public class PageRank {
new Class[] { PactLong.class });
PactRecordComparatorFactory.writeComparatorSetupToConfig(head.getConfiguration(), "pact.in.param.1.", new int[] { 0 },
new Class[] { PactLong.class });
headConfig.setMemorySize(10 * JobGraphUtils.MEGABYTE);
headConfig.setMemorySize(20 * JobGraphUtils.MEGABYTE);
headConfig.setBackChannelMemoryFraction(0.8f);
headConfig.setNumberOfIterations(3);
headConfig.setNumberOfIterations(1);
JobTaskVertex tail = JobGraphUtils.createTask(BulkIterationTailPactTask.class, "BulkIterationTail", jobGraph,
degreeOfParallelism);
......@@ -74,7 +65,7 @@ public class PageRank {
tailConfig.setDriver(ReduceDriver.class);
tailConfig.setStubClass(DotProductReducer.class);
PactRecordComparatorFactory.writeComparatorSetupToConfig(tail.getConfiguration(), "pact.in.param.0.", new int[] { 0 },
new Class[] { PactString.class });
new Class[] { PactLong.class });
tailConfig.setMemorySize(3 * JobGraphUtils.MEGABYTE);
tailConfig.setNumFilehandles(2);
tailConfig.setNumberOfEventsUntilInterruptInIterativeGate(0, degreeOfParallelism);
......@@ -88,29 +79,27 @@ public class PageRank {
JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.setStubClass(AppendTokenOutFormat.class);
outputConfig.setStubClass(PageWithRankOutFormat.class);
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, "file:///tmp/stratosphere/iterations");
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
JobOutputVertex fakeSyncOutput = JobGraphUtils.createSingletonFakeOutput(jobGraph, "FakeSyncOutput");
JobGraphUtils.connectLocal(pageWithRankInput, head, pageWithRankInputConfig, DistributionPattern.BIPARTITE,
ShipStrategy.BROADCAST);
JobGraphUtils.connectLocal(transitionMatrixInput, head, transitionMatrixInputConfig);
JobGraphUtils.connectLocal(pageWithRankInput, head, DistributionPattern.BIPARTITE, ShipStrategy.BROADCAST);
JobGraphUtils.connectLocal(transitionMatrixInput, head, DistributionPattern.POINTWISE,
ShipStrategy.PARTITION_HASH);
//TODO implicit order should be documented/configured somehow
JobGraphUtils.connectLocal(head, tail, headConfig, DistributionPattern.BIPARTITE, ShipStrategy.FORWARD);
JobGraphUtils.connectLocal(head, sync, headConfig);
JobGraphUtils.connectLocal(head, output, headConfig);
JobGraphUtils.connectLocal(tail, fakeTailOutput, tailConfig);
JobGraphUtils.connectLocal(sync, fakeSyncOutput, syncConfig);
JobGraphUtils.connectLocal(head, tail, DistributionPattern.BIPARTITE, ShipStrategy.FORWARD);
JobGraphUtils.connectLocal(head, sync);
JobGraphUtils.connectLocal(head, output);
JobGraphUtils.connectLocal(tail, fakeTailOutput);
JobGraphUtils.connectLocal(sync, fakeSyncOutput);
head.setVertexToShareInstancesWith(tail);
GlobalConfiguration.loadConfiguration(
"/home/ssc/Entwicklung/projects/stratosphere-iterations/stratosphere-dist/src/main/stratosphere-bin/conf");
GlobalConfiguration.loadConfiguration("/home/ssc/Desktop/stratosphere/local-conf");
Configuration conf = GlobalConfiguration.getConfiguration();
JobGraphUtils.submit(jobGraph, conf);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.playing.pagerank;
import com.google.common.base.Charsets;
import eu.stratosphere.pact.common.io.FileOutputFormat;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactDouble;
import eu.stratosphere.pact.common.type.base.PactLong;
import java.io.IOException;
public class PageWithRankOutFormat extends FileOutputFormat {
private final StringBuilder buffer = new StringBuilder();
@Override
public void writeRecord(PactRecord record) throws IOException {
buffer.setLength(0);
buffer.append(record.getField(0, PactLong.class).toString());
buffer.append('\t');
buffer.append(record.getField(1, PactDouble.class).toString());
buffer.append('\n');
byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8);
stream.write(bytes);
}
}
......@@ -43,15 +43,8 @@ public class Simple {
int degreeOfParallelism = 2;
JobGraph jobGraph = new JobGraph();
JobInputVertex input = new JobInputVertex("FileInput", jobGraph);
Class<AbstractInputTask<?>> clazz = (Class<AbstractInputTask<?>>) (Class<?>) DataSourceTask.class;
input.setInputClass(clazz);
input.setNumberOfSubtasks(degreeOfParallelism);
input.setNumberOfSubtasksPerInstance(degreeOfParallelism);
TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
inputConfig.setStubClass(TextInputFormat.class);
inputConfig.setLocalStrategy(TaskConfig.LocalStrategy.NONE);
inputConfig.setStubParameter(FileInputFormat.FILE_PARAMETER_KEY, "file:///home/ssc/Desktop/iterations/");
JobInputVertex input = JobGraphUtils.createInput(TextInputFormat.class,
"file:///home/ssc/Desktop/stratosphere/test-inputs/simple", "FileInput", jobGraph, degreeOfParallelism) ;
JobTaskVertex head = JobGraphUtils.createTask(BulkIterationHeadPactTask.class, "BulkIterationHead", jobGraph,
degreeOfParallelism);
......@@ -92,19 +85,18 @@ public class Simple {
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism);
JobOutputVertex fakeSyncOutput = JobGraphUtils.createSingletonFakeOutput(jobGraph, "FakeSyncOutput");
JobGraphUtils.connectLocal(input, head, inputConfig);
JobGraphUtils.connectLocal(input, head);
//TODO implicit order should be documented/configured somehow
JobGraphUtils.connectLocal(head, intermediate, headConfig);
JobGraphUtils.connectLocal(head, sync, headConfig);
JobGraphUtils.connectLocal(head, output, headConfig);
JobGraphUtils.connectLocal(intermediate, tail, intermediateConfig);
JobGraphUtils.connectLocal(tail, fakeTailOutput, tailConfig);
JobGraphUtils.connectLocal(sync, fakeSyncOutput, syncConfig);
JobGraphUtils.connectLocal(head, intermediate);
JobGraphUtils.connectLocal(head, sync);
JobGraphUtils.connectLocal(head, output);
JobGraphUtils.connectLocal(intermediate, tail);
JobGraphUtils.connectLocal(tail, fakeTailOutput);
JobGraphUtils.connectLocal(sync, fakeSyncOutput);
head.setVertexToShareInstancesWith(tail);
GlobalConfiguration.loadConfiguration(
"/home/ssc/Entwicklung/projects/stratosphere-iterations/stratosphere-dist/src/main/stratosphere-bin/conf");
GlobalConfiguration.loadConfiguration("/home/ssc/Desktop/stratosphere/local-conf");
Configuration conf = GlobalConfiguration.getConfiguration();
JobGraphUtils.submit(jobGraph, conf);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册