提交 3fe087d9 编写于 作者: S sewen

Implemented dangling pagerank with generic data types.

上级 5f58ce70
......@@ -129,10 +129,10 @@ public class AsciiLongArrayView {
}
}
public double elementAsDouble() {
String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII);
return Double.valueOf(token);
}
// public double elementAsDouble() {
// String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII);
// return Double.valueOf(token);
// }
@Override
......
......@@ -111,7 +111,7 @@ public class CompensatableDanglingPageRank {
// --------------- the inputs ---------------------
// page rank input
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(DanglingPageGenerateRankInputFormat.class,
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(ImprovedDanglingPageRankInputFormat.class,
pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
......@@ -120,7 +120,7 @@ public class CompensatableDanglingPageRank {
pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
// edges as adjacency list
JobInputVertex adjacencyListInput = JobGraphUtils.createInput(AdjacencyListInputFormat.class,
JobInputVertex adjacencyListInput = JobGraphUtils.createInput(ImprovedAdjacencyListInputFormat.class,
adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
......
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.common.io.FileOutputFormat;
import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
import eu.stratosphere.pact.generic.types.TypePairComparatorFactory;
import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.DiffL1NormConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyListComparatorFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyListSerializerFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDanglingComparatorFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDanglingSerializerFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankComparatorFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankSerializerFactory;
import eu.stratosphere.pact.runtime.iterative.playing.JobGraphUtils;
import eu.stratosphere.pact.runtime.iterative.playing.PlayConstants;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.BuildSecondCachedMatchDriver;
import eu.stratosphere.pact.runtime.task.CoGroupDriver;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.MapDriver;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
public class CustomCompensatableDanglingPageRank {
private static final int NUM_FILE_HANDLES_PER_SORT = 64;
private static final float SORT_SPILL_THRESHOLD = 0.85f;
private static TypeSerializerFactory<VertexWithRank> vertexWithRankSerializer = new VertexWithRankSerializerFactory();
private static TypeSerializerFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory();
private static TypeSerializerFactory<VertexWithAdjacencyList> vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory();
private static TypeComparatorFactory<VertexWithRank> vertexWithRankComparator = new VertexWithRankComparatorFactory();
private static TypeComparatorFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory();
private static TypeComparatorFactory<VertexWithAdjacencyList> vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory();
private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList> matchComparator =
new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory();
private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank> coGroupComparator =
new VertexWithRankDanglingToVertexWithRankPairComparatorFactory();
public static void main(String[] args) throws Exception {
String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf";
GlobalConfiguration.loadConfiguration(confPath);
Configuration conf = GlobalConfiguration.getConfiguration();
JobGraph jobGraph = getJobGraph(args);
JobGraphUtils.submit(jobGraph, conf);
}
public static JobGraph getJobGraph(String[] args) throws Exception {
int degreeOfParallelism = 2;
int numSubTasksPerInstance = degreeOfParallelism;
String pageWithRankInputPath = "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank";
String adjacencyListInputPath = "file://" + PlayConstants.PLAY_DIR +
"test-inputs/danglingpagerank/adjacencylists";
String outputPath = "file:///tmp/stratosphere/iterations";
// String confPath = PlayConstants.PLAY_DIR + "local-conf";
int minorConsumer = 25;
int matchMemory = 50;
int coGroupSortMemory = 50;
int numIterations = 25;
long numVertices = 5;
long numDanglingVertices = 1;
String failingWorkers = "1";
int failingIteration = 2;
double messageLoss = 0.75;
if (args.length >= 15) {
degreeOfParallelism = Integer.parseInt(args[0]);
numSubTasksPerInstance = Integer.parseInt(args[1]);
pageWithRankInputPath = args[2];
adjacencyListInputPath = args[3];
outputPath = args[4];
// confPath = args[5];
minorConsumer = Integer.parseInt(args[6]);
matchMemory = Integer.parseInt(args[7]);
coGroupSortMemory = Integer.parseInt(args[8]);
numIterations = Integer.parseInt(args[9]);
numVertices = Long.parseLong(args[10]);
numDanglingVertices = Long.parseLong(args[11]);
failingWorkers = args[12];
failingIteration = Integer.parseInt(args[13]);
messageLoss = Double.parseDouble(args[14]);
}
JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
// --------------- the inputs ---------------------
// page rank input
JobInputVertex pageWithRankInput = JobGraphUtils.createInput(CustomImprovedDanglingPageRankInputFormat.class,
pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
// edges as adjacency list
JobInputVertex adjacencyListInput = JobGraphUtils.createInput(CustomImprovedAdjacencyListInputFormat.class,
adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0);
// --------------- the head ---------------------
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
degreeOfParallelism, numSubTasksPerInstance);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
// initial input / partial solution
headConfig.addInputToGroup(0);
headConfig.setIterationHeadPartialSolutionInputIndex(0);
headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE);
headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT);
headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD);
// back channel / iterations
headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE);
// output into iteration
headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
// final output
TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
// the sync
headConfig.setIterationHeadIndexOfSyncOutput(3);
headConfig.setNumberOfIterations(numIterations);
// the driver
headConfig.setDriver(MapDriver.class);
headConfig.setDriverStrategy(DriverStrategy.MAP);
headConfig.setStubClass(CustomCompensatingMap.class);
headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
// --------------- the join ---------------------
JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
"IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class);
intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE);
intermediateConfig.addInputToGroup(0);
intermediateConfig.addInputToGroup(1);
intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1);
intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1);
intermediateConfig.setDriverPairComparator(matchComparator);
intermediateConfig.setOutputSerializer(vertexWithRankSerializer);
intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
intermediateConfig.setOutputComparator(vertexWithRankComparator, 0);
intermediateConfig.setStubClass(CustomCompensatableDotProductMatch.class);
intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
// ---------------- the tail (co group) --------------------
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
degreeOfParallelism, numSubTasksPerInstance);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
// TODO we need to combine!
// inputs and driver
tailConfig.setDriver(CoGroupDriver.class);
tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
tailConfig.addInputToGroup(0);
tailConfig.addInputToGroup(1);
tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
tailConfig.setInputSerializer(vertexWithRankSerializer, 1);
tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
tailConfig.setDriverComparator(vertexWithRankComparator, 1);
tailConfig.setDriverPairComparator(coGroupComparator);
tailConfig.setInputAsynchronouslyMaterialized(0, true);
tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE);
tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
tailConfig.setInputComparator(vertexWithRankComparator, 1);
tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE);
tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT);
tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD);
// output
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
// the stub
tailConfig.setStubClass(CustomCompensatableDotProductCoGroup.class);
tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
// --------------- the output ---------------------
JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism,
numSubTasksPerInstance);
TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
outputConfig.addInputToGroup(0);
outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
outputConfig.setStubClass(CustomPageWithRankOutFormat.class);
outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
// --------------- the auxiliaries ---------------------
JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput",
degreeOfParallelism, numSubTasksPerInstance);
JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
syncConfig.setNumberOfIterations(numIterations);
syncConfig.setConvergenceCriterion(DiffL1NormConvergenceCriterion.class);
// --------------- the wiring ---------------------
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
JobGraphUtils.connect(head, tail, ChannelType.NETWORK, DistributionPattern.POINTWISE);
JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
fakeTailOutput.setVertexToShareInstancesWith(tail);
tail.setVertexToShareInstancesWith(head);
pageWithRankInput.setVertexToShareInstancesWith(head);
adjacencyListInput.setVertexToShareInstancesWith(head);
intermediate.setVertexToShareInstancesWith(head);
output.setVertexToShareInstancesWith(head);
sync.setVertexToShareInstancesWith(head);
return jobGraph;
}
}
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.generic.stub.AbstractStub;
import eu.stratosphere.pact.generic.stub.GenericCoGrouper;
import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.DiffL1NormConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStats;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStatsAggregator;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling;
import eu.stratosphere.pact.runtime.iterative.concurrent.IterationContext;
import java.util.Iterator;
import java.util.Set;
public class CustomCompensatableDotProductCoGroup extends AbstractStub implements GenericCoGrouper<VertexWithRankAndDangling, VertexWithRank, VertexWithRankAndDangling> {
private VertexWithRankAndDangling accumulator = new VertexWithRankAndDangling();
private PageRankStatsAggregator aggregator =
(PageRankStatsAggregator) new DiffL1NormConvergenceCriterion().createAggregator();
private long numVertices;
private long numDanglingVertices;
private double dampingFactor;
private double danglingRankFactor;
private static final double BETA = 0.85;
private int workerIndex;
private int currentIteration;
private int failingIteration;
private Set<Integer> failingWorkers;
@Override
public void open(Configuration parameters) throws Exception {
workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters);
currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters);
failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
aggregator.reset();
dampingFactor = (1d - BETA) / (double) numVertices;
if (currentIteration == 1) {
danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices);
} else {
PageRankStats previousAggregate = (PageRankStats) IterationContext.instance().getGlobalAggregate(
workerIndex);
danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
}
}
@Override
public void coGroup(Iterator<VertexWithRankAndDangling> currentPageRankIterator, Iterator<VertexWithRank> partialRanks,
Collector<VertexWithRankAndDangling> collector)
{
if (!currentPageRankIterator.hasNext()) {
long missingVertex = partialRanks.next().getVertexID();
throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
}
VertexWithRankAndDangling currentPageRank = currentPageRankIterator.next();
long edges = 0;
double summedRank = 0;
while (partialRanks.hasNext()) {
summedRank += partialRanks.next().getRank();
edges++;
}
double rank = BETA * summedRank + dampingFactor + danglingRankFactor;
double currentRank = currentPageRank.getRank();
boolean isDangling = currentPageRank.isDangling();
double danglingRankToAggregate = isDangling ? rank : 0;
long danglingVerticesToAggregate = isDangling ? 1 : 0;
double diff = Math.abs(currentRank - rank);
aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0);
accumulator.setVertexID(currentPageRank.getVertexID());
accumulator.setRank(rank);
accumulator.setDangling(isDangling);
collector.collect(accumulator);
}
@Override
public void close() throws Exception {
if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) {
aggregator.reset();
}
IterationContext.instance().setAggregate(workerIndex, aggregator.getAggregate());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.generic.stub.AbstractStub;
import eu.stratosphere.pact.generic.stub.GenericMatcher;
import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling;
import java.util.Random;
import java.util.Set;
public class CustomCompensatableDotProductMatch extends AbstractStub implements
GenericMatcher<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank> {
private VertexWithRank record = new VertexWithRank();
private Random random = new Random();
private double messageLoss;
private boolean isFailure;
@Override
public void open(Configuration parameters) throws Exception {
int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters);
int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters);
int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex);
messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
}
@Override
public void match(VertexWithRankAndDangling pageWithRank, VertexWithAdjacencyList adjacencyList, Collector<VertexWithRank> collector)
throws Exception
{
double rank = pageWithRank.getRank();
long[] adjacentNeighbors = adjacencyList.getTargets();
int numNeighbors = adjacencyList.getNumTargets();
double rankToDistribute = rank / (double) numNeighbors;
record.setRank(rankToDistribute);
for (int n = 0; n < numNeighbors; n++) {
record.setVertexID(adjacentNeighbors[n]);
if (isFailure) {
if (random.nextDouble() >= messageLoss) {
collector.collect(record);
}
} else {
collector.collect(record);
}
}
}
}
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.generic.stub.AbstractStub;
import eu.stratosphere.pact.generic.stub.GenericMapper;
import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStats;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling;
import eu.stratosphere.pact.runtime.iterative.concurrent.IterationContext;
import java.util.Set;
public class CustomCompensatingMap extends AbstractStub implements GenericMapper<VertexWithRankAndDangling, VertexWithRankAndDangling> {
private boolean isFailureIteration;
private boolean isFailingWorker;
private double uniformRank;
private double rescaleFactor;
@Override
public void open(Configuration parameters) throws Exception {
int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters);
int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
isFailureIteration = currentIteration == failingIteration + 1;
int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters);
Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
isFailingWorker = failingWorkers.contains(workerIndex);
long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
if (currentIteration > 1) {
PageRankStats stats = (PageRankStats) IterationContext.instance().getGlobalAggregate(workerIndex);
uniformRank = 1d / (double) numVertices;
double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices;
rescaleFactor = (1 - lostMassFactor) / stats.rank();
}
}
@Override
public void map(VertexWithRankAndDangling pageWithRank, Collector<VertexWithRankAndDangling> out) throws Exception {
if (isFailureIteration) {
double rank = pageWithRank.getRank();
if (isFailingWorker) {
pageWithRank.setRank(uniformRank);
} else {
pageWithRank.setRank(rank * rescaleFactor);
}
}
out.collect(pageWithRank);
}
}
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import eu.stratosphere.pact.generic.io.DelimitedInputFormat;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.AsciiLongArrayView;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList;
public class CustomImprovedAdjacencyListInputFormat extends DelimitedInputFormat<VertexWithAdjacencyList> {
private final AsciiLongArrayView arrayView = new AsciiLongArrayView();
@Override
public boolean readRecord(VertexWithAdjacencyList target, byte[] bytes, int offset, int numBytes) {
if (numBytes == 0) {
return false;
}
arrayView.set(bytes, offset, numBytes);
long[] list = target.getTargets();
try {
int pos = 0;
while (arrayView.next()) {
if (pos == 0) {
target.setVertexID(arrayView.element());
} else {
if (list.length <= pos - 1) {
list = new long[list.length < 16 ? 16 : list.length * 2];
target.setTargets(list);
}
list[pos - 1] = arrayView.element();
}
pos++;
}
target.setNumTargets(pos - 1);
} catch (RuntimeException e) {
throw new RuntimeException("Error parsing: " + arrayView.toString(), e);
}
return true;
}
}
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.io.DelimitedInputFormat;
import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.AsciiLongArrayView;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling;
public class CustomImprovedDanglingPageRankInputFormat extends DelimitedInputFormat<VertexWithRankAndDangling> {
private AsciiLongArrayView arrayView = new AsciiLongArrayView();
private static final long DANGLING_MARKER = 1l;
private double initialRank;
@Override
public void configure(Configuration parameters) {
long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
initialRank = 1.0 / numVertices;
super.configure(parameters);
}
@Override
public boolean readRecord(VertexWithRankAndDangling target, byte[] bytes, int offset, int numBytes) {
arrayView.set(bytes, offset, numBytes);
try {
arrayView.next();
target.setVertexID(arrayView.element());
if (arrayView.next()) {
target.setDangling(arrayView.element() == DANGLING_MARKER);
} else {
target.setDangling(false);
}
} catch (NumberFormatException e) {
throw new RuntimeException("Error parsing " + arrayView.toString(), e);
}
target.setRank(initialRank);
return true;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom;
import com.google.common.base.Charsets;
import eu.stratosphere.pact.generic.io.FileOutputFormat;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling;
import java.io.IOException;
public class CustomPageWithRankOutFormat extends FileOutputFormat<VertexWithRankAndDangling> {
private final StringBuilder buffer = new StringBuilder();
@Override
public void writeRecord(VertexWithRankAndDangling record) throws IOException {
buffer.setLength(0);
buffer.append(record.getVertexID());
buffer.append('\t');
buffer.append(record.getRank());
buffer.append('\n');
byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8);
stream.write(bytes);
}
}
......@@ -12,38 +12,38 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
/**
*
*/
public final class NodeWithAdjacencyList {
public final class VertexWithAdjacencyList {
private static final long[] EMPTY = new long[0];
private long nodeId;
private long vertexID;
private long[] targets;
private int numTargets;
public NodeWithAdjacencyList() {
public VertexWithAdjacencyList() {
this.targets = EMPTY;
}
public NodeWithAdjacencyList(long nodeId, long[] targets) {
this.nodeId = nodeId;
public VertexWithAdjacencyList(long vertexID, long[] targets) {
this.vertexID = vertexID;
this.targets = targets;
}
public long getNodeId() {
return nodeId;
public long getVertexID() {
return vertexID;
}
public void setNodeId(long nodeId) {
this.nodeId = nodeId;
public void setVertexID(long vertexID) {
this.vertexID = vertexID;
}
public long[] getTargets() {
......
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import java.io.IOException;
......@@ -23,29 +23,29 @@ import eu.stratosphere.pact.generic.types.TypeComparator;
/**
*
*/
public final class NodeWithRankAndDanglingComparator implements TypeComparator<NodeWithRankAndDangling> {
public final class VertexWithAdjacencyListComparator implements TypeComparator<VertexWithAdjacencyList> {
private long reference;
@Override
public int hash(NodeWithRankAndDangling record) {
final long value = record.getNodeId();
public int hash(VertexWithAdjacencyList record) {
final long value = record.getVertexID();
return 43 + (int) (value ^ value >>> 32);
}
@Override
public void setReference(NodeWithRankAndDangling toCompare) {
this.reference = toCompare.getNodeId();
public void setReference(VertexWithAdjacencyList toCompare) {
this.reference = toCompare.getVertexID();
}
@Override
public boolean equalToReference(NodeWithRankAndDangling candidate) {
return candidate.getNodeId() == this.reference;
public boolean equalToReference(VertexWithAdjacencyList candidate) {
return candidate.getVertexID() == this.reference;
}
@Override
public int compareToReference(TypeComparator<NodeWithRankAndDangling> referencedComparator) {
NodeWithRankAndDanglingComparator comp = (NodeWithRankAndDanglingComparator) referencedComparator;
public int compareToReference(TypeComparator<VertexWithAdjacencyList> referencedComparator) {
VertexWithAdjacencyListComparator comp = (VertexWithAdjacencyListComparator) referencedComparator;
final long diff = comp.reference - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
......@@ -72,8 +72,8 @@ public final class NodeWithRankAndDanglingComparator implements TypeComparator<N
}
@Override
public void putNormalizedKey(NodeWithRankAndDangling record, byte[] target, int offset, int len) {
final long value = record.getNodeId();
public void putNormalizedKey(VertexWithAdjacencyList record, byte[] target, int offset, int len) {
final long value = record.getVertexID();
if (len == 8) {
// default case, full normalized key
......@@ -122,7 +122,7 @@ public final class NodeWithRankAndDanglingComparator implements TypeComparator<N
}
@Override
public NodeWithRankAndDanglingComparator duplicate() {
return new NodeWithRankAndDanglingComparator();
public VertexWithAdjacencyListComparator duplicate() {
return new VertexWithAdjacencyListComparator();
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
......@@ -20,7 +20,7 @@ import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
/**
*
*/
public final class NodeWithRankAndDanglingComparatorFactory implements TypeComparatorFactory<NodeWithRankAndDangling> {
public final class VertexWithAdjacencyListComparatorFactory implements TypeComparatorFactory<VertexWithAdjacencyList> {
@Override
public void writeParametersToConfig(Configuration config) {}
......@@ -29,7 +29,7 @@ public final class NodeWithRankAndDanglingComparatorFactory implements TypeCompa
public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
@Override
public NodeWithRankAndDanglingComparator createComparator() {
return new NodeWithRankAndDanglingComparator();
public VertexWithAdjacencyListComparator createComparator() {
return new VertexWithAdjacencyListComparator();
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import java.io.IOException;
......@@ -24,32 +24,32 @@ import eu.stratosphere.pact.generic.types.TypeSerializer;
/**
*
*/
public final class NodeWithAdjacencyListSerializer implements TypeSerializer<NodeWithAdjacencyList> {
public final class VertexWithAdjacencyListSerializer implements TypeSerializer<VertexWithAdjacencyList> {
@Override
public NodeWithAdjacencyList createInstance() {
return new NodeWithAdjacencyList();
public VertexWithAdjacencyList createInstance() {
return new VertexWithAdjacencyList();
}
@Override
public NodeWithAdjacencyList createCopy(NodeWithAdjacencyList from) {
public VertexWithAdjacencyList createCopy(VertexWithAdjacencyList from) {
long[] targets = new long[from.getTargets().length];
System.arraycopy(from.getTargets(), 0, targets, 0, targets.length);
NodeWithAdjacencyList copy = new NodeWithAdjacencyList();
copy.setNodeId(from.getNodeId());
VertexWithAdjacencyList copy = new VertexWithAdjacencyList();
copy.setVertexID(from.getVertexID());
copy.setNumTargets(from.getNumTargets());
copy.setTargets(targets);
return copy;
}
@Override
public void copyTo(NodeWithAdjacencyList from, NodeWithAdjacencyList to) {
public void copyTo(VertexWithAdjacencyList from, VertexWithAdjacencyList to) {
if (to.getTargets().length < from.getTargets().length) {
to.setTargets(new long[from.getTargets().length]);
}
to.setNodeId(from.getNodeId());
to.setVertexID(from.getVertexID());
to.setNumTargets(from.getNumTargets());
System.arraycopy(from.getTargets(), 0, to.getTargets(), 0, from.getNumTargets());
}
......@@ -60,8 +60,8 @@ public final class NodeWithAdjacencyListSerializer implements TypeSerializer<Nod
}
@Override
public void serialize(NodeWithAdjacencyList record, DataOutputView target) throws IOException {
target.writeLong(record.getNodeId());
public void serialize(VertexWithAdjacencyList record, DataOutputView target) throws IOException {
target.writeLong(record.getVertexID());
final long[] targets = record.getTargets();
final int numTargets = record.getNumTargets();
......@@ -73,8 +73,8 @@ public final class NodeWithAdjacencyListSerializer implements TypeSerializer<Nod
}
@Override
public void deserialize(NodeWithAdjacencyList target, DataInputView source) throws IOException {
target.setNodeId(source.readLong());
public void deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
final int numTargets = source.readInt();
long[] targets = target.getTargets();
......
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
......@@ -20,9 +20,9 @@ import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
/**
*
*/
public final class NodeWithRankAndDanglingSerializerFactory implements TypeSerializerFactory<NodeWithRankAndDangling> {
public final class VertexWithAdjacencyListSerializerFactory implements TypeSerializerFactory<VertexWithAdjacencyList> {
private static final NodeWithRankAndDanglingSerializer INSTANCE = new NodeWithRankAndDanglingSerializer();
private static final VertexWithAdjacencyListSerializer INSTANCE = new VertexWithAdjacencyListSerializer();
@Override
public void writeParametersToConfig(Configuration config) {}
......@@ -31,12 +31,12 @@ public final class NodeWithRankAndDanglingSerializerFactory implements TypeSeria
public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
@Override
public NodeWithRankAndDanglingSerializer getSerializer() {
public VertexWithAdjacencyListSerializer getSerializer() {
return INSTANCE;
}
@Override
public Class<NodeWithRankAndDangling> getDataType() {
return NodeWithRankAndDangling.class;
public Class<VertexWithAdjacencyList> getDataType() {
return VertexWithAdjacencyList.class;
}
}
......@@ -12,34 +12,34 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
/**
*
*/
public final class NodeWithRank {
public final class VertexWithRank {
private long nodeId;
private long vertexID;
private double rank;
public NodeWithRank() {
public VertexWithRank() {
}
public NodeWithRank(long nodeId, double rank) {
this.nodeId = nodeId;
public VertexWithRank(long vertexID, double rank) {
this.vertexID = vertexID;
this.rank = rank;
}
public long getNodeId() {
return nodeId;
public long getVertexID() {
return vertexID;
}
public void setNodeId(long nodeId) {
this.nodeId = nodeId;
public void setVertexID(long vertexID) {
this.vertexID = vertexID;
}
public double getRank() {
......
......@@ -12,37 +12,37 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
/**
*
*/
public final class NodeWithRankAndDangling {
public final class VertexWithRankAndDangling {
private long nodeId;
private long vertexID;
private double rank;
private boolean dangling;
public NodeWithRankAndDangling() {
public VertexWithRankAndDangling() {
}
public NodeWithRankAndDangling(long nodeId, double rank, boolean dangling) {
this.nodeId = nodeId;
public VertexWithRankAndDangling(long vertexID, double rank, boolean dangling) {
this.vertexID = vertexID;
this.rank = rank;
this.dangling = dangling;
}
public long getNodeId() {
return nodeId;
public long getVertexID() {
return vertexID;
}
public void setNodeId(long nodeId) {
this.nodeId = nodeId;
public void setVertexID(long vertexID) {
this.vertexID = vertexID;
}
public double getRank() {
......
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import java.io.IOException;
......@@ -23,29 +23,29 @@ import eu.stratosphere.pact.generic.types.TypeComparator;
/**
*
*/
public final class NodeWithAdjacencyListComparator implements TypeComparator<NodeWithAdjacencyList> {
public final class VertexWithRankAndDanglingComparator implements TypeComparator<VertexWithRankAndDangling> {
private long reference;
@Override
public int hash(NodeWithAdjacencyList record) {
final long value = record.getNodeId();
public int hash(VertexWithRankAndDangling record) {
final long value = record.getVertexID();
return 43 + (int) (value ^ value >>> 32);
}
@Override
public void setReference(NodeWithAdjacencyList toCompare) {
this.reference = toCompare.getNodeId();
public void setReference(VertexWithRankAndDangling toCompare) {
this.reference = toCompare.getVertexID();
}
@Override
public boolean equalToReference(NodeWithAdjacencyList candidate) {
return candidate.getNodeId() == this.reference;
public boolean equalToReference(VertexWithRankAndDangling candidate) {
return candidate.getVertexID() == this.reference;
}
@Override
public int compareToReference(TypeComparator<NodeWithAdjacencyList> referencedComparator) {
NodeWithAdjacencyListComparator comp = (NodeWithAdjacencyListComparator) referencedComparator;
public int compareToReference(TypeComparator<VertexWithRankAndDangling> referencedComparator) {
VertexWithRankAndDanglingComparator comp = (VertexWithRankAndDanglingComparator) referencedComparator;
final long diff = comp.reference - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
......@@ -72,8 +72,8 @@ public final class NodeWithAdjacencyListComparator implements TypeComparator<Nod
}
@Override
public void putNormalizedKey(NodeWithAdjacencyList record, byte[] target, int offset, int len) {
final long value = record.getNodeId();
public void putNormalizedKey(VertexWithRankAndDangling record, byte[] target, int offset, int len) {
final long value = record.getVertexID();
if (len == 8) {
// default case, full normalized key
......@@ -122,7 +122,7 @@ public final class NodeWithAdjacencyListComparator implements TypeComparator<Nod
}
@Override
public NodeWithAdjacencyListComparator duplicate() {
return new NodeWithAdjacencyListComparator();
public VertexWithRankAndDanglingComparator duplicate() {
return new VertexWithRankAndDanglingComparator();
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
......@@ -20,7 +20,7 @@ import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
/**
*
*/
public final class NodeWithAdjacencyListComparatorFactory implements TypeComparatorFactory<NodeWithAdjacencyList> {
public final class VertexWithRankAndDanglingComparatorFactory implements TypeComparatorFactory<VertexWithRankAndDangling> {
@Override
public void writeParametersToConfig(Configuration config) {}
......@@ -29,7 +29,7 @@ public final class NodeWithAdjacencyListComparatorFactory implements TypeCompara
public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
@Override
public NodeWithAdjacencyListComparator createComparator() {
return new NodeWithAdjacencyListComparator();
public VertexWithRankAndDanglingComparator createComparator() {
return new VertexWithRankAndDanglingComparator();
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import java.io.IOException;
......@@ -24,16 +24,16 @@ import eu.stratosphere.pact.generic.types.TypeSerializer;
/**
*
*/
public final class NodeWithRankAndDanglingSerializer implements TypeSerializer<NodeWithRankAndDangling> {
public final class VertexWithRankAndDanglingSerializer implements TypeSerializer<VertexWithRankAndDangling> {
@Override
public NodeWithRankAndDangling createInstance() {
return new NodeWithRankAndDangling();
public VertexWithRankAndDangling createInstance() {
return new VertexWithRankAndDangling();
}
@Override
public NodeWithRankAndDangling createCopy(NodeWithRankAndDangling from) {
NodeWithRankAndDangling n = new NodeWithRankAndDangling();
public VertexWithRankAndDangling createCopy(VertexWithRankAndDangling from) {
VertexWithRankAndDangling n = new VertexWithRankAndDangling();
copyTo(from, n);
return n;
}
......@@ -42,8 +42,8 @@ public final class NodeWithRankAndDanglingSerializer implements TypeSerializer<N
* @see eu.stratosphere.pact.generic.types.TypeSerializer#copyTo(java.lang.Object, java.lang.Object)
*/
@Override
public void copyTo(NodeWithRankAndDangling from, NodeWithRankAndDangling to) {
to.setNodeId(from.getNodeId());
public void copyTo(VertexWithRankAndDangling from, VertexWithRankAndDangling to) {
to.setVertexID(from.getVertexID());
to.setRank(from.getRank());
to.setDangling(from.isDangling());
}
......@@ -54,15 +54,15 @@ public final class NodeWithRankAndDanglingSerializer implements TypeSerializer<N
}
@Override
public void serialize(NodeWithRankAndDangling record, DataOutputView target) throws IOException {
target.writeLong(record.getNodeId());
public void serialize(VertexWithRankAndDangling record, DataOutputView target) throws IOException {
target.writeLong(record.getVertexID());
target.writeDouble(record.getRank());
target.writeBoolean(record.isDangling());
}
@Override
public void deserialize(NodeWithRankAndDangling target, DataInputView source) throws IOException {
target.setNodeId(source.readLong());
public void deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
target.setRank(source.readDouble());
target.setDangling(source.readBoolean());
}
......
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
......@@ -20,9 +20,9 @@ import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
/**
*
*/
public final class NodeWithAdjacencyListSerializerFactory implements TypeSerializerFactory<NodeWithAdjacencyList> {
public final class VertexWithRankAndDanglingSerializerFactory implements TypeSerializerFactory<VertexWithRankAndDangling> {
private static final NodeWithAdjacencyListSerializer INSTANCE = new NodeWithAdjacencyListSerializer();
private static final VertexWithRankAndDanglingSerializer INSTANCE = new VertexWithRankAndDanglingSerializer();
@Override
public void writeParametersToConfig(Configuration config) {}
......@@ -31,12 +31,12 @@ public final class NodeWithAdjacencyListSerializerFactory implements TypeSeriali
public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
@Override
public NodeWithAdjacencyListSerializer getSerializer() {
public VertexWithRankAndDanglingSerializer getSerializer() {
return INSTANCE;
}
@Override
public Class<NodeWithAdjacencyList> getDataType() {
return NodeWithAdjacencyList.class;
public Class<VertexWithRankAndDangling> getDataType() {
return VertexWithRankAndDangling.class;
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import java.io.IOException;
......@@ -23,29 +23,29 @@ import eu.stratosphere.pact.generic.types.TypeComparator;
/**
*
*/
public final class NodeWithRankComparator implements TypeComparator<NodeWithRank> {
public final class VertexWithRankComparator implements TypeComparator<VertexWithRank> {
private long reference;
@Override
public int hash(NodeWithRank record) {
final long value = record.getNodeId();
public int hash(VertexWithRank record) {
final long value = record.getVertexID();
return 43 + (int) (value ^ value >>> 32);
}
@Override
public void setReference(NodeWithRank toCompare) {
this.reference = toCompare.getNodeId();
public void setReference(VertexWithRank toCompare) {
this.reference = toCompare.getVertexID();
}
@Override
public boolean equalToReference(NodeWithRank candidate) {
return candidate.getNodeId() == this.reference;
public boolean equalToReference(VertexWithRank candidate) {
return candidate.getVertexID() == this.reference;
}
@Override
public int compareToReference(TypeComparator<NodeWithRank> referencedComparator) {
NodeWithRankComparator comp = (NodeWithRankComparator) referencedComparator;
public int compareToReference(TypeComparator<VertexWithRank> referencedComparator) {
VertexWithRankComparator comp = (VertexWithRankComparator) referencedComparator;
final long diff = comp.reference - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
......@@ -72,8 +72,8 @@ public final class NodeWithRankComparator implements TypeComparator<NodeWithRank
}
@Override
public void putNormalizedKey(NodeWithRank record, byte[] target, int offset, int len) {
final long value = record.getNodeId();
public void putNormalizedKey(VertexWithRank record, byte[] target, int offset, int len) {
final long value = record.getVertexID();
if (len == 8) {
// default case, full normalized key
......@@ -122,7 +122,7 @@ public final class NodeWithRankComparator implements TypeComparator<NodeWithRank
}
@Override
public NodeWithRankComparator duplicate() {
return new NodeWithRankComparator();
public VertexWithRankComparator duplicate() {
return new VertexWithRankComparator();
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
......@@ -20,7 +20,7 @@ import eu.stratosphere.pact.generic.types.TypeComparatorFactory;
/**
*
*/
public final class NodeWithRankComparatorFactory implements TypeComparatorFactory<NodeWithRank> {
public final class VertexWithRankComparatorFactory implements TypeComparatorFactory<VertexWithRank> {
@Override
public void writeParametersToConfig(Configuration config) {}
......@@ -29,7 +29,7 @@ public final class NodeWithRankComparatorFactory implements TypeComparatorFactor
public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
@Override
public NodeWithRankComparator createComparator() {
return new NodeWithRankComparator();
public VertexWithRankComparator createComparator() {
return new VertexWithRankComparator();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.pact.generic.types.TypeComparator;
import eu.stratosphere.pact.generic.types.TypePairComparator;
import eu.stratosphere.pact.generic.types.TypePairComparatorFactory;
/**
*
*/
public class VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory
implements TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList>
{
@Override
public VertexWithRankDanglingToVertexWithAdjacencyListPairComparator createComparator12(
TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithAdjacencyList> comparator2)
{
return new VertexWithRankDanglingToVertexWithAdjacencyListPairComparator();
}
@Override
public VertexWithAdjacencyListToVertexWithRankDanglingPairComparator createComparator21(
TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithAdjacencyList> comparator2)
{
return new VertexWithAdjacencyListToVertexWithRankDanglingPairComparator();
}
public static final class VertexWithRankDanglingToVertexWithAdjacencyListPairComparator
implements TypePairComparator<VertexWithRankAndDangling, VertexWithAdjacencyList>
{
private long reference;
@Override
public void setReference(VertexWithRankAndDangling reference) {
this.reference = reference.getVertexID();
}
@Override
public boolean equalToReference(VertexWithAdjacencyList candidate) {
return this.reference == candidate.getVertexID();
}
@Override
public int compareToReference(VertexWithAdjacencyList candidate) {
long diff = candidate.getVertexID() - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
}
public static final class VertexWithAdjacencyListToVertexWithRankDanglingPairComparator
implements TypePairComparator<VertexWithAdjacencyList, VertexWithRankAndDangling>
{
private long reference;
@Override
public void setReference(VertexWithAdjacencyList reference) {
this.reference = reference.getVertexID();
}
@Override
public boolean equalToReference(VertexWithRankAndDangling candidate) {
return this.reference == candidate.getVertexID();
}
@Override
public int compareToReference(VertexWithRankAndDangling candidate) {
long diff = candidate.getVertexID() - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.pact.generic.types.TypeComparator;
import eu.stratosphere.pact.generic.types.TypePairComparator;
import eu.stratosphere.pact.generic.types.TypePairComparatorFactory;
/**
*
*/
public class VertexWithRankDanglingToVertexWithRankPairComparatorFactory
implements TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank>
{
@Override
public VertexWithRankDanglingToVertexWithRankComparator createComparator12(
TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithRank> comparator2)
{
return new VertexWithRankDanglingToVertexWithRankComparator();
}
@Override
public VertexWithRankToVertexWithRankDanglingPairComparator createComparator21(
TypeComparator<VertexWithRankAndDangling> comparator1, TypeComparator<VertexWithRank> comparator2)
{
return new VertexWithRankToVertexWithRankDanglingPairComparator();
}
public static final class VertexWithRankDanglingToVertexWithRankComparator
implements TypePairComparator<VertexWithRankAndDangling, VertexWithRank>
{
private long reference;
@Override
public void setReference(VertexWithRankAndDangling reference) {
this.reference = reference.getVertexID();
}
@Override
public boolean equalToReference(VertexWithRank candidate) {
return this.reference == candidate.getVertexID();
}
@Override
public int compareToReference(VertexWithRank candidate) {
long diff = candidate.getVertexID() - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
}
public static final class VertexWithRankToVertexWithRankDanglingPairComparator
implements TypePairComparator<VertexWithRank, VertexWithRankAndDangling>
{
private long reference;
@Override
public void setReference(VertexWithRank reference) {
this.reference = reference.getVertexID();
}
@Override
public boolean equalToReference(VertexWithRankAndDangling candidate) {
return this.reference == candidate.getVertexID();
}
@Override
public int compareToReference(VertexWithRankAndDangling candidate) {
long diff = candidate.getVertexID() - this.reference;
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
}
}
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import java.io.IOException;
......@@ -24,16 +24,16 @@ import eu.stratosphere.pact.generic.types.TypeSerializer;
/**
*
*/
public final class NodeWithRankSerializer implements TypeSerializer<NodeWithRank> {
public final class VertexWithRankSerializer implements TypeSerializer<VertexWithRank> {
@Override
public NodeWithRank createInstance() {
return new NodeWithRank();
public VertexWithRank createInstance() {
return new VertexWithRank();
}
@Override
public NodeWithRank createCopy(NodeWithRank from) {
NodeWithRank n = new NodeWithRank();
public VertexWithRank createCopy(VertexWithRank from) {
VertexWithRank n = new VertexWithRank();
copyTo(from, n);
return n;
}
......@@ -42,8 +42,8 @@ public final class NodeWithRankSerializer implements TypeSerializer<NodeWithRank
* @see eu.stratosphere.pact.generic.types.TypeSerializer#copyTo(java.lang.Object, java.lang.Object)
*/
@Override
public void copyTo(NodeWithRank from, NodeWithRank to) {
to.setNodeId(from.getNodeId());
public void copyTo(VertexWithRank from, VertexWithRank to) {
to.setVertexID(from.getVertexID());
to.setRank(from.getRank());
}
......@@ -53,14 +53,14 @@ public final class NodeWithRankSerializer implements TypeSerializer<NodeWithRank
}
@Override
public void serialize(NodeWithRank record, DataOutputView target) throws IOException {
target.writeLong(record.getNodeId());
public void serialize(VertexWithRank record, DataOutputView target) throws IOException {
target.writeLong(record.getVertexID());
target.writeDouble(record.getRank());
}
@Override
public void deserialize(NodeWithRank target, DataInputView source) throws IOException {
target.setNodeId(source.readLong());
public void deserialize(VertexWithRank target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
target.setRank(source.readDouble());
}
......
......@@ -12,7 +12,7 @@
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types;
package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
......@@ -20,9 +20,9 @@ import eu.stratosphere.pact.generic.types.TypeSerializerFactory;
/**
*
*/
public final class NodeWithRankSerializerFactory implements TypeSerializerFactory<NodeWithRank> {
public final class VertexWithRankSerializerFactory implements TypeSerializerFactory<VertexWithRank> {
private static final NodeWithRankSerializer INSTANCE = new NodeWithRankSerializer();
private static final VertexWithRankSerializer INSTANCE = new VertexWithRankSerializer();
@Override
public void writeParametersToConfig(Configuration config) {}
......@@ -31,12 +31,12 @@ public final class NodeWithRankSerializerFactory implements TypeSerializerFactor
public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {}
@Override
public NodeWithRankSerializer getSerializer() {
public VertexWithRankSerializer getSerializer() {
return INSTANCE;
}
@Override
public Class<NodeWithRank> getDataType() {
return NodeWithRank.class;
public Class<VertexWithRank> getDataType() {
return VertexWithRank.class;
}
}
......@@ -24,6 +24,7 @@ import org.junit.runners.Parameterized.Parameters;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.CustomCompensatableDanglingPageRank;
import eu.stratosphere.pact.test.util.TestBase;
@RunWith(Parameterized.class)
......@@ -102,7 +103,7 @@ public class DanglingPageRankITCase extends TestBase {
"0"
};
return CompensatableDanglingPageRank.getJobGraph(parameters);
return CustomCompensatableDanglingPageRank.getJobGraph(parameters);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册