From 3fe087d974a370d30a0e6c0742e609134664c5d9 Mon Sep 17 00:00:00 2001 From: sewen Date: Mon, 11 Feb 2013 02:30:10 +0100 Subject: [PATCH] Implemented dangling pagerank with generic data types. --- .../danglingpagerank/AsciiLongArrayView.java | 8 +- .../CompensatableDanglingPageRank.java | 4 +- .../CustomCompensatableDanglingPageRank.java | 305 ++++++++++++++++++ .../CustomCompensatableDotProductCoGroup.java | 110 +++++++ .../CustomCompensatableDotProductMatch.java | 73 +++++ .../custom/CustomCompensatingMap.java | 61 ++++ ...ustomImprovedAdjacencyListInputFormat.java | 46 +++ ...omImprovedDanglingPageRankInputFormat.java | 47 +++ .../custom/CustomPageWithRankOutFormat.java | 40 +++ .../types/VertexWithAdjacencyList.java} | 20 +- .../VertexWithAdjacencyListComparator.java} | 28 +- ...exWithAdjacencyListComparatorFactory.java} | 8 +- .../VertexWithAdjacencyListSerializer.java} | 26 +- ...exWithAdjacencyListSerializerFactory.java} | 12 +- .../types/VertexWithRank.java} | 20 +- .../types/VertexWithRankAndDangling.java} | 20 +- .../VertexWithRankAndDanglingComparator.java} | 28 +- ...WithRankAndDanglingComparatorFactory.java} | 8 +- .../VertexWithRankAndDanglingSerializer.java} | 24 +- ...WithRankAndDanglingSerializerFactory.java} | 12 +- .../types/VertexWithRankComparator.java} | 28 +- .../VertexWithRankComparatorFactory.java} | 8 +- ...ithAdjacencyListPairComparatorFactory.java | 87 +++++ ...ToVertexWithRankPairComparatorFactory.java | 87 +++++ .../types/VertexWithRankSerializer.java} | 24 +- .../VertexWithRankSerializerFactory.java} | 12 +- .../iterative/DanglingPageRankITCase.java | 3 +- 27 files changed, 1003 insertions(+), 146 deletions(-) create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithAdjacencyList.java => custom/types/VertexWithAdjacencyList.java} (80%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankAndDanglingComparator.java => custom/types/VertexWithAdjacencyListComparator.java} (78%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankAndDanglingComparatorFactory.java => custom/types/VertexWithAdjacencyListComparatorFactory.java} (83%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithAdjacencyListSerializer.java => custom/types/VertexWithAdjacencyListSerializer.java} (77%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankAndDanglingSerializerFactory.java => custom/types/VertexWithAdjacencyListSerializerFactory.java} (76%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRank.java => custom/types/VertexWithRank.java} (78%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankAndDangling.java => custom/types/VertexWithRankAndDangling.java} (78%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithAdjacencyListComparator.java => custom/types/VertexWithRankAndDanglingComparator.java} (77%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithAdjacencyListComparatorFactory.java => custom/types/VertexWithRankAndDanglingComparatorFactory.java} (82%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankAndDanglingSerializer.java => custom/types/VertexWithRankAndDanglingSerializer.java} (69%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithAdjacencyListSerializerFactory.java => custom/types/VertexWithRankAndDanglingSerializerFactory.java} (75%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankComparator.java => custom/types/VertexWithRankComparator.java} (79%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankComparatorFactory.java => custom/types/VertexWithRankComparatorFactory.java} (85%) create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java create mode 100644 pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankSerializer.java => custom/types/VertexWithRankSerializer.java} (72%) rename pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/{types/NodeWithRankSerializerFactory.java => custom/types/VertexWithRankSerializerFactory.java} (79%) diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java index 7b81885da74..f436e09f58c 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java @@ -129,10 +129,10 @@ public class AsciiLongArrayView { } } - public double elementAsDouble() { - String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII); - return Double.valueOf(token); - } +// public double elementAsDouble() { +// String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII); +// return Double.valueOf(token); +// } @Override diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java index d3728aecf28..60c5191f24f 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java @@ -111,7 +111,7 @@ public class CompensatableDanglingPageRank { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(DanglingPageGenerateRankInputFormat.class, + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(ImprovedDanglingPageRankInputFormat.class, pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -120,7 +120,7 @@ public class CompensatableDanglingPageRank { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(AdjacencyListInputFormat.class, + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(ImprovedAdjacencyListInputFormat.class, adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java new file mode 100644 index 00000000000..cc5f2a954c1 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java @@ -0,0 +1,305 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.nephele.configuration.GlobalConfiguration; +import eu.stratosphere.nephele.io.DistributionPattern; +import eu.stratosphere.nephele.io.channels.ChannelType; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.nephele.jobgraph.JobInputVertex; +import eu.stratosphere.nephele.jobgraph.JobOutputVertex; +import eu.stratosphere.nephele.jobgraph.JobTaskVertex; +import eu.stratosphere.pact.common.io.FileOutputFormat; +import eu.stratosphere.pact.generic.types.TypeComparatorFactory; +import eu.stratosphere.pact.generic.types.TypePairComparatorFactory; +import eu.stratosphere.pact.generic.types.TypeSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.DiffL1NormConvergenceCriterion; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyListComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyListSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDanglingComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDanglingSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.playing.JobGraphUtils; +import eu.stratosphere.pact.runtime.iterative.playing.PlayConstants; +import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask; +import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask; +import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.pact.runtime.task.BuildSecondCachedMatchDriver; +import eu.stratosphere.pact.runtime.task.CoGroupDriver; +import eu.stratosphere.pact.runtime.task.DriverStrategy; +import eu.stratosphere.pact.runtime.task.MapDriver; +import eu.stratosphere.pact.runtime.task.util.LocalStrategy; +import eu.stratosphere.pact.runtime.task.util.TaskConfig; + +public class CustomCompensatableDanglingPageRank { + + private static final int NUM_FILE_HANDLES_PER_SORT = 64; + + private static final float SORT_SPILL_THRESHOLD = 0.85f; + + + private static TypeSerializerFactory vertexWithRankSerializer = new VertexWithRankSerializerFactory(); + + private static TypeSerializerFactory vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory(); + + private static TypeSerializerFactory vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory(); + + private static TypeComparatorFactory vertexWithRankComparator = new VertexWithRankComparatorFactory(); + + private static TypeComparatorFactory vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory(); + + private static TypeComparatorFactory vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory(); + + private static TypePairComparatorFactory matchComparator = + new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory(); + + private static TypePairComparatorFactory coGroupComparator = + new VertexWithRankDanglingToVertexWithRankPairComparatorFactory(); + + + public static void main(String[] args) throws Exception { + String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf"; + + GlobalConfiguration.loadConfiguration(confPath); + Configuration conf = GlobalConfiguration.getConfiguration(); + + JobGraph jobGraph = getJobGraph(args); + JobGraphUtils.submit(jobGraph, conf); + } + + public static JobGraph getJobGraph(String[] args) throws Exception { + + int degreeOfParallelism = 2; + int numSubTasksPerInstance = degreeOfParallelism; + String pageWithRankInputPath = "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank"; + String adjacencyListInputPath = "file://" + PlayConstants.PLAY_DIR + + "test-inputs/danglingpagerank/adjacencylists"; + String outputPath = "file:///tmp/stratosphere/iterations"; +// String confPath = PlayConstants.PLAY_DIR + "local-conf"; + int minorConsumer = 25; + int matchMemory = 50; + int coGroupSortMemory = 50; + int numIterations = 25; + long numVertices = 5; + long numDanglingVertices = 1; + + String failingWorkers = "1"; + int failingIteration = 2; + double messageLoss = 0.75; + + if (args.length >= 15) { + degreeOfParallelism = Integer.parseInt(args[0]); + numSubTasksPerInstance = Integer.parseInt(args[1]); + pageWithRankInputPath = args[2]; + adjacencyListInputPath = args[3]; + outputPath = args[4]; +// confPath = args[5]; + minorConsumer = Integer.parseInt(args[6]); + matchMemory = Integer.parseInt(args[7]); + coGroupSortMemory = Integer.parseInt(args[8]); + numIterations = Integer.parseInt(args[9]); + numVertices = Long.parseLong(args[10]); + numDanglingVertices = Long.parseLong(args[11]); + failingWorkers = args[12]; + failingIteration = Integer.parseInt(args[13]); + messageLoss = Double.parseDouble(args[14]); + } + + JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank"); + + // --------------- the inputs --------------------- + + // page rank input + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(CustomImprovedDanglingPageRankInputFormat.class, + pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); + TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); + pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0); + pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + + // edges as adjacency list + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(CustomImprovedAdjacencyListInputFormat.class, + adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); + TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); + adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer); + adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0); + + // --------------- the head --------------------- + JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, + degreeOfParallelism, numSubTasksPerInstance); + TaskConfig headConfig = new TaskConfig(head.getConfiguration()); + + // initial input / partial solution + headConfig.addInputToGroup(0); + headConfig.setIterationHeadPartialSolutionInputIndex(0); + headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0); + headConfig.setInputLocalStrategy(0, LocalStrategy.SORT); + headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE); + headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT); + headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD); + + // back channel / iterations + headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE); + + // output into iteration + headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + + // final output + TaskConfig headFinalOutConfig = new TaskConfig(new Configuration()); + headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig); + + // the sync + headConfig.setIterationHeadIndexOfSyncOutput(3); + headConfig.setNumberOfIterations(numIterations); + + // the driver + headConfig.setDriver(MapDriver.class); + headConfig.setDriverStrategy(DriverStrategy.MAP); + headConfig.setStubClass(CustomCompensatingMap.class); + headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + headConfig.setStubParameter("compensation.failingWorker", failingWorkers); + headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration)); + headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss)); + + // --------------- the join --------------------- + + JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, + "IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance); + TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration()); +// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class); + intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class); + intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE); + intermediateConfig.addInputToGroup(0); + intermediateConfig.addInputToGroup(1); + intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1); + intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0); + intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1); + intermediateConfig.setDriverPairComparator(matchComparator); + + intermediateConfig.setOutputSerializer(vertexWithRankSerializer); + intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + intermediateConfig.setOutputComparator(vertexWithRankComparator, 0); + + intermediateConfig.setStubClass(CustomCompensatableDotProductMatch.class); + intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers); + intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration)); + intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss)); + + // ---------------- the tail (co group) -------------------- + + JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, + degreeOfParallelism, numSubTasksPerInstance); + TaskConfig tailConfig = new TaskConfig(tail.getConfiguration()); + // TODO we need to combine! + + // inputs and driver + tailConfig.setDriver(CoGroupDriver.class); + tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP); + tailConfig.addInputToGroup(0); + tailConfig.addInputToGroup(1); + tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + tailConfig.setInputSerializer(vertexWithRankSerializer, 1); + tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0); + tailConfig.setDriverComparator(vertexWithRankComparator, 1); + tailConfig.setDriverPairComparator(coGroupComparator); + tailConfig.setInputAsynchronouslyMaterialized(0, true); + tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE); + tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT); + tailConfig.setInputComparator(vertexWithRankComparator, 1); + tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE); + tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT); + tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD); + + // output + tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + + // the stub + tailConfig.setStubClass(CustomCompensatableDotProductCoGroup.class); + tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices)); + tailConfig.setStubParameter("compensation.failingWorker", failingWorkers); + tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration)); + tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss)); + + // --------------- the output --------------------- + + JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism, + numSubTasksPerInstance); + TaskConfig outputConfig = new TaskConfig(output.getConfiguration()); + outputConfig.addInputToGroup(0); + outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + outputConfig.setStubClass(CustomPageWithRankOutFormat.class); + outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath); + + // --------------- the auxiliaries --------------------- + + JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", + degreeOfParallelism, numSubTasksPerInstance); + + JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism); + TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); + syncConfig.setNumberOfIterations(numIterations); + syncConfig.setConvergenceCriterion(DiffL1NormConvergenceCriterion.class); + + // --------------- the wiring --------------------- + + JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + + JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1); + + JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + + JobGraphUtils.connect(head, tail, ChannelType.NETWORK, DistributionPattern.POINTWISE); + JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1); + tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism); + + JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + + JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE); + + fakeTailOutput.setVertexToShareInstancesWith(tail); + tail.setVertexToShareInstancesWith(head); + pageWithRankInput.setVertexToShareInstancesWith(head); + adjacencyListInput.setVertexToShareInstancesWith(head); + intermediate.setVertexToShareInstancesWith(head); + output.setVertexToShareInstancesWith(head); + sync.setVertexToShareInstancesWith(head); + + return jobGraph; + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java new file mode 100644 index 00000000000..b500d321a62 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java @@ -0,0 +1,110 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.generic.stub.AbstractStub; +import eu.stratosphere.pact.generic.stub.GenericCoGrouper; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.DiffL1NormConvergenceCriterion; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStats; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStatsAggregator; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; +import eu.stratosphere.pact.runtime.iterative.concurrent.IterationContext; + +import java.util.Iterator; +import java.util.Set; + +public class CustomCompensatableDotProductCoGroup extends AbstractStub implements GenericCoGrouper { + + private VertexWithRankAndDangling accumulator = new VertexWithRankAndDangling(); + + private PageRankStatsAggregator aggregator = + (PageRankStatsAggregator) new DiffL1NormConvergenceCriterion().createAggregator(); + + private long numVertices; + + private long numDanglingVertices; + + private double dampingFactor; + + private double danglingRankFactor; + + private static final double BETA = 0.85; + + private int workerIndex; + + private int currentIteration; + + private int failingIteration; + + private Set failingWorkers; + + @Override + public void open(Configuration parameters) throws Exception { + workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); + failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); + failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); + + numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters); + numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters); + + aggregator.reset(); + + dampingFactor = (1d - BETA) / (double) numVertices; + + if (currentIteration == 1) { + danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices); + } else { + PageRankStats previousAggregate = (PageRankStats) IterationContext.instance().getGlobalAggregate( + workerIndex); + danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices; + } + } + + @Override + public void coGroup(Iterator currentPageRankIterator, Iterator partialRanks, + Collector collector) + { + if (!currentPageRankIterator.hasNext()) { + long missingVertex = partialRanks.next().getVertexID(); + throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!"); + } + + VertexWithRankAndDangling currentPageRank = currentPageRankIterator.next(); + + long edges = 0; + double summedRank = 0; + while (partialRanks.hasNext()) { + summedRank += partialRanks.next().getRank(); + edges++; + } + + double rank = BETA * summedRank + dampingFactor + danglingRankFactor; + + double currentRank = currentPageRank.getRank(); + boolean isDangling = currentPageRank.isDangling(); + + double danglingRankToAggregate = isDangling ? rank : 0; + long danglingVerticesToAggregate = isDangling ? 1 : 0; + + double diff = Math.abs(currentRank - rank); + + aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0); + + accumulator.setVertexID(currentPageRank.getVertexID()); + accumulator.setRank(rank); + accumulator.setDangling(isDangling); + + collector.collect(accumulator); + } + + @Override + public void close() throws Exception { + if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) { + aggregator.reset(); + } + IterationContext.instance().setAggregate(workerIndex, aggregator.getAggregate()); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java new file mode 100644 index 00000000000..e7d022e3271 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java @@ -0,0 +1,73 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.generic.stub.AbstractStub; +import eu.stratosphere.pact.generic.stub.GenericMatcher; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; + +import java.util.Random; +import java.util.Set; + +public class CustomCompensatableDotProductMatch extends AbstractStub implements + GenericMatcher { + + private VertexWithRank record = new VertexWithRank(); + + private Random random = new Random(); + + private double messageLoss; + + private boolean isFailure; + + @Override + public void open(Configuration parameters) throws Exception { + int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); + int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); + Set failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); + isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex); + messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters); + } + + @Override + public void match(VertexWithRankAndDangling pageWithRank, VertexWithAdjacencyList adjacencyList, Collector collector) + throws Exception + { + double rank = pageWithRank.getRank(); + long[] adjacentNeighbors = adjacencyList.getTargets(); + int numNeighbors = adjacencyList.getNumTargets(); + + double rankToDistribute = rank / (double) numNeighbors; + record.setRank(rankToDistribute); + + for (int n = 0; n < numNeighbors; n++) { + record.setVertexID(adjacentNeighbors[n]); + if (isFailure) { + if (random.nextDouble() >= messageLoss) { + collector.collect(record); + } + } else { + collector.collect(record); + } + } + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java new file mode 100644 index 00000000000..ccf8a772dcb --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java @@ -0,0 +1,61 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.generic.stub.AbstractStub; +import eu.stratosphere.pact.generic.stub.GenericMapper; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStats; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; +import eu.stratosphere.pact.runtime.iterative.concurrent.IterationContext; + +import java.util.Set; + +public class CustomCompensatingMap extends AbstractStub implements GenericMapper { + + private boolean isFailureIteration; + + private boolean isFailingWorker; + + private double uniformRank; + + private double rescaleFactor; + + @Override + public void open(Configuration parameters) throws Exception { + + + int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); + int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); + isFailureIteration = currentIteration == failingIteration + 1; + + int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + Set failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); + isFailingWorker = failingWorkers.contains(workerIndex); + + long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters); + + if (currentIteration > 1) { + PageRankStats stats = (PageRankStats) IterationContext.instance().getGlobalAggregate(workerIndex); + + uniformRank = 1d / (double) numVertices; + double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices; + rescaleFactor = (1 - lostMassFactor) / stats.rank(); + } + } + + @Override + public void map(VertexWithRankAndDangling pageWithRank, Collector out) throws Exception { + + if (isFailureIteration) { + double rank = pageWithRank.getRank(); + + if (isFailingWorker) { + pageWithRank.setRank(uniformRank); + } else { + pageWithRank.setRank(rank * rescaleFactor); + } + } + out.collect(pageWithRank); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java new file mode 100644 index 00000000000..ed4ec2c4c6b --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java @@ -0,0 +1,46 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.pact.generic.io.DelimitedInputFormat; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.AsciiLongArrayView; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList; + +public class CustomImprovedAdjacencyListInputFormat extends DelimitedInputFormat { + + private final AsciiLongArrayView arrayView = new AsciiLongArrayView(); + + @Override + public boolean readRecord(VertexWithAdjacencyList target, byte[] bytes, int offset, int numBytes) { + + if (numBytes == 0) { + return false; + } + + arrayView.set(bytes, offset, numBytes); + + long[] list = target.getTargets(); + + try { + + int pos = 0; + while (arrayView.next()) { + + if (pos == 0) { + target.setVertexID(arrayView.element()); + } else { + if (list.length <= pos - 1) { + list = new long[list.length < 16 ? 16 : list.length * 2]; + target.setTargets(list); + } + list[pos - 1] = arrayView.element(); + } + pos++; + } + + target.setNumTargets(pos - 1); + } catch (RuntimeException e) { + throw new RuntimeException("Error parsing: " + arrayView.toString(), e); + } + + return true; + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java new file mode 100644 index 00000000000..d9cf6e409c8 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java @@ -0,0 +1,47 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.generic.io.DelimitedInputFormat; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.AsciiLongArrayView; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; + +public class CustomImprovedDanglingPageRankInputFormat extends DelimitedInputFormat { + + private AsciiLongArrayView arrayView = new AsciiLongArrayView(); + + private static final long DANGLING_MARKER = 1l; + + private double initialRank; + + @Override + public void configure(Configuration parameters) { + long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters); + initialRank = 1.0 / numVertices; + super.configure(parameters); + } + + @Override + public boolean readRecord(VertexWithRankAndDangling target, byte[] bytes, int offset, int numBytes) { + + arrayView.set(bytes, offset, numBytes); + + try { + arrayView.next(); + target.setVertexID(arrayView.element()); + + if (arrayView.next()) { + target.setDangling(arrayView.element() == DANGLING_MARKER); + } else { + target.setDangling(false); + } + + } catch (NumberFormatException e) { + throw new RuntimeException("Error parsing " + arrayView.toString(), e); + } + + target.setRank(initialRank); + + return true; + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java new file mode 100644 index 00000000000..ac705d103a8 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java @@ -0,0 +1,40 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import com.google.common.base.Charsets; + +import eu.stratosphere.pact.generic.io.FileOutputFormat; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; + +import java.io.IOException; + +public class CustomPageWithRankOutFormat extends FileOutputFormat { + + private final StringBuilder buffer = new StringBuilder(); + + @Override + public void writeRecord(VertexWithRankAndDangling record) throws IOException { + buffer.setLength(0); + buffer.append(record.getVertexID()); + buffer.append('\t'); + buffer.append(record.getRank()); + buffer.append('\n'); + + byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8); + stream.write(bytes); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyList.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyList.java similarity index 80% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyList.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyList.java index 5021676cc2a..1624793f629 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyList.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyList.java @@ -12,38 +12,38 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; /** * */ -public final class NodeWithAdjacencyList { +public final class VertexWithAdjacencyList { private static final long[] EMPTY = new long[0]; - private long nodeId; + private long vertexID; private long[] targets; private int numTargets; - public NodeWithAdjacencyList() { + public VertexWithAdjacencyList() { this.targets = EMPTY; } - public NodeWithAdjacencyList(long nodeId, long[] targets) { - this.nodeId = nodeId; + public VertexWithAdjacencyList(long vertexID, long[] targets) { + this.vertexID = vertexID; this.targets = targets; } - public long getNodeId() { - return nodeId; + public long getVertexID() { + return vertexID; } - public void setNodeId(long nodeId) { - this.nodeId = nodeId; + public void setVertexID(long vertexID) { + this.vertexID = vertexID; } public long[] getTargets() { diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparator.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparator.java similarity index 78% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparator.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparator.java index ee5c6e4e70b..8811d0cc5e8 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparator.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparator.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -23,29 +23,29 @@ import eu.stratosphere.pact.generic.types.TypeComparator; /** * */ -public final class NodeWithRankAndDanglingComparator implements TypeComparator { +public final class VertexWithAdjacencyListComparator implements TypeComparator { private long reference; @Override - public int hash(NodeWithRankAndDangling record) { - final long value = record.getNodeId(); + public int hash(VertexWithAdjacencyList record) { + final long value = record.getVertexID(); return 43 + (int) (value ^ value >>> 32); } @Override - public void setReference(NodeWithRankAndDangling toCompare) { - this.reference = toCompare.getNodeId(); + public void setReference(VertexWithAdjacencyList toCompare) { + this.reference = toCompare.getVertexID(); } @Override - public boolean equalToReference(NodeWithRankAndDangling candidate) { - return candidate.getNodeId() == this.reference; + public boolean equalToReference(VertexWithAdjacencyList candidate) { + return candidate.getVertexID() == this.reference; } @Override - public int compareToReference(TypeComparator referencedComparator) { - NodeWithRankAndDanglingComparator comp = (NodeWithRankAndDanglingComparator) referencedComparator; + public int compareToReference(TypeComparator referencedComparator) { + VertexWithAdjacencyListComparator comp = (VertexWithAdjacencyListComparator) referencedComparator; final long diff = comp.reference - this.reference; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } @@ -72,8 +72,8 @@ public final class NodeWithRankAndDanglingComparator implements TypeComparator { +public final class VertexWithAdjacencyListComparatorFactory implements TypeComparatorFactory { @Override public void writeParametersToConfig(Configuration config) {} @@ -29,7 +29,7 @@ public final class NodeWithRankAndDanglingComparatorFactory implements TypeCompa public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankAndDanglingComparator createComparator() { - return new NodeWithRankAndDanglingComparator(); + public VertexWithAdjacencyListComparator createComparator() { + return new VertexWithAdjacencyListComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializer.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializer.java similarity index 77% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializer.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializer.java index 5d552dcad60..f42a3cec208 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializer.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializer.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -24,32 +24,32 @@ import eu.stratosphere.pact.generic.types.TypeSerializer; /** * */ -public final class NodeWithAdjacencyListSerializer implements TypeSerializer { +public final class VertexWithAdjacencyListSerializer implements TypeSerializer { @Override - public NodeWithAdjacencyList createInstance() { - return new NodeWithAdjacencyList(); + public VertexWithAdjacencyList createInstance() { + return new VertexWithAdjacencyList(); } @Override - public NodeWithAdjacencyList createCopy(NodeWithAdjacencyList from) { + public VertexWithAdjacencyList createCopy(VertexWithAdjacencyList from) { long[] targets = new long[from.getTargets().length]; System.arraycopy(from.getTargets(), 0, targets, 0, targets.length); - NodeWithAdjacencyList copy = new NodeWithAdjacencyList(); - copy.setNodeId(from.getNodeId()); + VertexWithAdjacencyList copy = new VertexWithAdjacencyList(); + copy.setVertexID(from.getVertexID()); copy.setNumTargets(from.getNumTargets()); copy.setTargets(targets); return copy; } @Override - public void copyTo(NodeWithAdjacencyList from, NodeWithAdjacencyList to) { + public void copyTo(VertexWithAdjacencyList from, VertexWithAdjacencyList to) { if (to.getTargets().length < from.getTargets().length) { to.setTargets(new long[from.getTargets().length]); } - to.setNodeId(from.getNodeId()); + to.setVertexID(from.getVertexID()); to.setNumTargets(from.getNumTargets()); System.arraycopy(from.getTargets(), 0, to.getTargets(), 0, from.getNumTargets()); } @@ -60,8 +60,8 @@ public final class NodeWithAdjacencyListSerializer implements TypeSerializer { +public final class VertexWithAdjacencyListSerializerFactory implements TypeSerializerFactory { - private static final NodeWithRankAndDanglingSerializer INSTANCE = new NodeWithRankAndDanglingSerializer(); + private static final VertexWithAdjacencyListSerializer INSTANCE = new VertexWithAdjacencyListSerializer(); @Override public void writeParametersToConfig(Configuration config) {} @@ -31,12 +31,12 @@ public final class NodeWithRankAndDanglingSerializerFactory implements TypeSeria public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankAndDanglingSerializer getSerializer() { + public VertexWithAdjacencyListSerializer getSerializer() { return INSTANCE; } @Override - public Class getDataType() { - return NodeWithRankAndDangling.class; + public Class getDataType() { + return VertexWithAdjacencyList.class; } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRank.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRank.java similarity index 78% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRank.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRank.java index 5b2290cbaab..b568c09189a 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRank.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRank.java @@ -12,34 +12,34 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; /** * */ -public final class NodeWithRank { +public final class VertexWithRank { - private long nodeId; + private long vertexID; private double rank; - public NodeWithRank() { + public VertexWithRank() { } - public NodeWithRank(long nodeId, double rank) { - this.nodeId = nodeId; + public VertexWithRank(long vertexID, double rank) { + this.vertexID = vertexID; this.rank = rank; } - public long getNodeId() { - return nodeId; + public long getVertexID() { + return vertexID; } - public void setNodeId(long nodeId) { - this.nodeId = nodeId; + public void setVertexID(long vertexID) { + this.vertexID = vertexID; } public double getRank() { diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDangling.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDangling.java similarity index 78% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDangling.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDangling.java index 18d4a22d5da..de7c261b5b1 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDangling.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDangling.java @@ -12,37 +12,37 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; /** * */ -public final class NodeWithRankAndDangling { +public final class VertexWithRankAndDangling { - private long nodeId; + private long vertexID; private double rank; private boolean dangling; - public NodeWithRankAndDangling() { + public VertexWithRankAndDangling() { } - public NodeWithRankAndDangling(long nodeId, double rank, boolean dangling) { - this.nodeId = nodeId; + public VertexWithRankAndDangling(long vertexID, double rank, boolean dangling) { + this.vertexID = vertexID; this.rank = rank; this.dangling = dangling; } - public long getNodeId() { - return nodeId; + public long getVertexID() { + return vertexID; } - public void setNodeId(long nodeId) { - this.nodeId = nodeId; + public void setVertexID(long vertexID) { + this.vertexID = vertexID; } public double getRank() { diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparator.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparator.java similarity index 77% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparator.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparator.java index 3f7234825f8..37706a2608d 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparator.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparator.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -23,29 +23,29 @@ import eu.stratosphere.pact.generic.types.TypeComparator; /** * */ -public final class NodeWithAdjacencyListComparator implements TypeComparator { +public final class VertexWithRankAndDanglingComparator implements TypeComparator { private long reference; @Override - public int hash(NodeWithAdjacencyList record) { - final long value = record.getNodeId(); + public int hash(VertexWithRankAndDangling record) { + final long value = record.getVertexID(); return 43 + (int) (value ^ value >>> 32); } @Override - public void setReference(NodeWithAdjacencyList toCompare) { - this.reference = toCompare.getNodeId(); + public void setReference(VertexWithRankAndDangling toCompare) { + this.reference = toCompare.getVertexID(); } @Override - public boolean equalToReference(NodeWithAdjacencyList candidate) { - return candidate.getNodeId() == this.reference; + public boolean equalToReference(VertexWithRankAndDangling candidate) { + return candidate.getVertexID() == this.reference; } @Override - public int compareToReference(TypeComparator referencedComparator) { - NodeWithAdjacencyListComparator comp = (NodeWithAdjacencyListComparator) referencedComparator; + public int compareToReference(TypeComparator referencedComparator) { + VertexWithRankAndDanglingComparator comp = (VertexWithRankAndDanglingComparator) referencedComparator; final long diff = comp.reference - this.reference; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } @@ -72,8 +72,8 @@ public final class NodeWithAdjacencyListComparator implements TypeComparator { +public final class VertexWithRankAndDanglingComparatorFactory implements TypeComparatorFactory { @Override public void writeParametersToConfig(Configuration config) {} @@ -29,7 +29,7 @@ public final class NodeWithAdjacencyListComparatorFactory implements TypeCompara public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithAdjacencyListComparator createComparator() { - return new NodeWithAdjacencyListComparator(); + public VertexWithRankAndDanglingComparator createComparator() { + return new VertexWithRankAndDanglingComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializer.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializer.java similarity index 69% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializer.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializer.java index 115ec662b32..7f1f4407f1a 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializer.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializer.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -24,16 +24,16 @@ import eu.stratosphere.pact.generic.types.TypeSerializer; /** * */ -public final class NodeWithRankAndDanglingSerializer implements TypeSerializer { +public final class VertexWithRankAndDanglingSerializer implements TypeSerializer { @Override - public NodeWithRankAndDangling createInstance() { - return new NodeWithRankAndDangling(); + public VertexWithRankAndDangling createInstance() { + return new VertexWithRankAndDangling(); } @Override - public NodeWithRankAndDangling createCopy(NodeWithRankAndDangling from) { - NodeWithRankAndDangling n = new NodeWithRankAndDangling(); + public VertexWithRankAndDangling createCopy(VertexWithRankAndDangling from) { + VertexWithRankAndDangling n = new VertexWithRankAndDangling(); copyTo(from, n); return n; } @@ -42,8 +42,8 @@ public final class NodeWithRankAndDanglingSerializer implements TypeSerializer { +public final class VertexWithRankAndDanglingSerializerFactory implements TypeSerializerFactory { - private static final NodeWithAdjacencyListSerializer INSTANCE = new NodeWithAdjacencyListSerializer(); + private static final VertexWithRankAndDanglingSerializer INSTANCE = new VertexWithRankAndDanglingSerializer(); @Override public void writeParametersToConfig(Configuration config) {} @@ -31,12 +31,12 @@ public final class NodeWithAdjacencyListSerializerFactory implements TypeSeriali public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithAdjacencyListSerializer getSerializer() { + public VertexWithRankAndDanglingSerializer getSerializer() { return INSTANCE; } @Override - public Class getDataType() { - return NodeWithAdjacencyList.class; + public Class getDataType() { + return VertexWithRankAndDangling.class; } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparator.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparator.java similarity index 79% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparator.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparator.java index 2956eb96370..a92f5cce924 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparator.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparator.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -23,29 +23,29 @@ import eu.stratosphere.pact.generic.types.TypeComparator; /** * */ -public final class NodeWithRankComparator implements TypeComparator { +public final class VertexWithRankComparator implements TypeComparator { private long reference; @Override - public int hash(NodeWithRank record) { - final long value = record.getNodeId(); + public int hash(VertexWithRank record) { + final long value = record.getVertexID(); return 43 + (int) (value ^ value >>> 32); } @Override - public void setReference(NodeWithRank toCompare) { - this.reference = toCompare.getNodeId(); + public void setReference(VertexWithRank toCompare) { + this.reference = toCompare.getVertexID(); } @Override - public boolean equalToReference(NodeWithRank candidate) { - return candidate.getNodeId() == this.reference; + public boolean equalToReference(VertexWithRank candidate) { + return candidate.getVertexID() == this.reference; } @Override - public int compareToReference(TypeComparator referencedComparator) { - NodeWithRankComparator comp = (NodeWithRankComparator) referencedComparator; + public int compareToReference(TypeComparator referencedComparator) { + VertexWithRankComparator comp = (VertexWithRankComparator) referencedComparator; final long diff = comp.reference - this.reference; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } @@ -72,8 +72,8 @@ public final class NodeWithRankComparator implements TypeComparator { +public final class VertexWithRankComparatorFactory implements TypeComparatorFactory { @Override public void writeParametersToConfig(Configuration config) {} @@ -29,7 +29,7 @@ public final class NodeWithRankComparatorFactory implements TypeComparatorFactor public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankComparator createComparator() { - return new NodeWithRankComparator(); + public VertexWithRankComparator createComparator() { + return new VertexWithRankComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java new file mode 100644 index 00000000000..60f457d061d --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java @@ -0,0 +1,87 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; + +import eu.stratosphere.pact.generic.types.TypeComparator; +import eu.stratosphere.pact.generic.types.TypePairComparator; +import eu.stratosphere.pact.generic.types.TypePairComparatorFactory; + + +/** + * + */ +public class VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory + implements TypePairComparatorFactory +{ + + @Override + public VertexWithRankDanglingToVertexWithAdjacencyListPairComparator createComparator12( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithRankDanglingToVertexWithAdjacencyListPairComparator(); + } + + @Override + public VertexWithAdjacencyListToVertexWithRankDanglingPairComparator createComparator21( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithAdjacencyListToVertexWithRankDanglingPairComparator(); + } + + + public static final class VertexWithRankDanglingToVertexWithAdjacencyListPairComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithRankAndDangling reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithAdjacencyList candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithAdjacencyList candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } + + public static final class VertexWithAdjacencyListToVertexWithRankDanglingPairComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithAdjacencyList reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithRankAndDangling candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithRankAndDangling candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java new file mode 100644 index 00000000000..7563d304bf0 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java @@ -0,0 +1,87 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; + +import eu.stratosphere.pact.generic.types.TypeComparator; +import eu.stratosphere.pact.generic.types.TypePairComparator; +import eu.stratosphere.pact.generic.types.TypePairComparatorFactory; + + +/** + * + */ +public class VertexWithRankDanglingToVertexWithRankPairComparatorFactory + implements TypePairComparatorFactory +{ + + @Override + public VertexWithRankDanglingToVertexWithRankComparator createComparator12( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithRankDanglingToVertexWithRankComparator(); + } + + @Override + public VertexWithRankToVertexWithRankDanglingPairComparator createComparator21( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithRankToVertexWithRankDanglingPairComparator(); + } + + + public static final class VertexWithRankDanglingToVertexWithRankComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithRankAndDangling reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithRank candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithRank candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } + + public static final class VertexWithRankToVertexWithRankDanglingPairComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithRank reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithRankAndDangling candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithRankAndDangling candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializer.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializer.java similarity index 72% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializer.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializer.java index fc62f2eb994..3bb86636c4a 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializer.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializer.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -24,16 +24,16 @@ import eu.stratosphere.pact.generic.types.TypeSerializer; /** * */ -public final class NodeWithRankSerializer implements TypeSerializer { +public final class VertexWithRankSerializer implements TypeSerializer { @Override - public NodeWithRank createInstance() { - return new NodeWithRank(); + public VertexWithRank createInstance() { + return new VertexWithRank(); } @Override - public NodeWithRank createCopy(NodeWithRank from) { - NodeWithRank n = new NodeWithRank(); + public VertexWithRank createCopy(VertexWithRank from) { + VertexWithRank n = new VertexWithRank(); copyTo(from, n); return n; } @@ -42,8 +42,8 @@ public final class NodeWithRankSerializer implements TypeSerializer { +public final class VertexWithRankSerializerFactory implements TypeSerializerFactory { - private static final NodeWithRankSerializer INSTANCE = new NodeWithRankSerializer(); + private static final VertexWithRankSerializer INSTANCE = new VertexWithRankSerializer(); @Override public void writeParametersToConfig(Configuration config) {} @@ -31,12 +31,12 @@ public final class NodeWithRankSerializerFactory implements TypeSerializerFactor public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankSerializer getSerializer() { + public VertexWithRankSerializer getSerializer() { return INSTANCE; } @Override - public Class getDataType() { - return NodeWithRank.class; + public Class getDataType() { + return VertexWithRank.class; } } diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java index a466d52f04a..5de5b2db6d4 100644 --- a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java @@ -24,6 +24,7 @@ import org.junit.runners.Parameterized.Parameters; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.CustomCompensatableDanglingPageRank; import eu.stratosphere.pact.test.util.TestBase; @RunWith(Parameterized.class) @@ -102,7 +103,7 @@ public class DanglingPageRankITCase extends TestBase { "0" }; - return CompensatableDanglingPageRank.getJobGraph(parameters); + return CustomCompensatableDanglingPageRank.getJobGraph(parameters); } -- GitLab