GraphX 是 Spark 图表和图形并行计算的新组件。GraphX 延伸 Spark RDD 通过引入新的图形的抽象:计算与连接到每个顶点和边缘性的向量。以支持图形计算,GraphX 公开了一组基本的操作符(例如 subgraph, joinVertices 和 aggregateMessages)以及一个优化高阶API。此外,GraphX 包括的图形越来越多的收集 algorithms 和 builders ,以简化图形分析任务。
GraphX 在Spark 1.2 中的API面向用户做很少改变:
为了提高性能,我们已经推出了新版本 mapReduceTriplets
叫做 aggregateMessages
这需要从前面返回的消息 mapReduceTriplets
(通过回调EdgeContext
返回值),而不是。我们不支持 mapReduceTriplets
并鼓励用户详细的阅读变更指南。
在 Spark 1.0 和 1.1 中 EdgeRDD
从 EdgeRDD[ED]
至 EdgeRDD[ED, VD]
优化了一些缓存。我们已经发现,因为一个更优雅的解决方案,并已更自然的恢复类型签名到 EdgeRDD[ED]
类型。
开始时,首先需要在项目中导入 Spark和 GraphX 的依赖,具体如下:
import org.apache.spark._ import org.apache.spark.graphx._ // To make some of the examples work we will also need RDD import org.apache.spark.rdd.RDD
如果不使用 Spark shell,你还需要一个 SparkContext
。要了解更多关于 Spark 参考 Spark 快速入门指南。
该属性是一个每个顶点和边缘用户定义的对象的有向图。一个向图是一个由边缘和点计算组成的关系图。支持平行边缘简化了建模场景,其中可以有相同的顶点之间的多个关系(例如,同事和朋友)。每个顶点由唯一的一个64 位标识符键值(VertexID
)。GraphX 不会对顶点标识符的任何排序约束。同样,边缘具有相应的源和目的地顶点标识符。
该物理图形参数是顶点(VD
)和边缘(ED
)类型。这些类型是分别与每个顶点和边相关联的对象
GraphX 优化顶点和边的类型的表达,当他们的原始数据类型(例如,int,double 等等)通过将它们存储在专有的阵列,可以减小内存占用。
在某些情况下,可能希望具有不同的属性类型的顶点在相同的图中。这可以通过继承来完成。例如,以用户和产品型号为两个图,我们不妨做到以下几点:
class VertexProperty() case class UserProperty(val name: String) extends VertexProperty case class ProductProperty(val name: String, val price: Double) extends VertexProperty // The graph might then have the type: var graph: Graph[VertexProperty, String] = null
类似 RDD数据集合,图的产生是通过分布式和容错,并且不可变的。更改值或图形的结构是由生产具有所期望的变化的新图形来实现的。注意原始图主要部分(即,未受影响的结构,属性和索引)被重用,在新图形减少这个固有功能数据结构的属性。图表跨采用一系列顶点划分发执行。与RDD数据集合不同,图中的每个分区可以在不同的机器上在发生故障的情况下重新创建。
按道理属性图对应的一对类型的集合(RDD)的编码为每个顶点和边的属性。其结果是,该图类包含的成员访问该图的顶点和边:
class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] }
这些类 VertexRDD[VD]
和 EdgeRDD[ED]
延伸,并分别经过优化的 RDD[(VertexID, VD)]
和
RDD[Edge[ED]]
。二者 VertexRDD[VD]
和 EdgeRDD[ED]
提供每个图的计算和优化内部内置附加功能。我们在上一节更详细的讨论 VertexRDD
和 EdgeRDD
API ( 顶点和边RDDS) ,现在他们可以被看作仅仅是形式的RDD集合: RDD[(VertexID, VD)]
和 RDD[Edge[ED]]
。
假设我们想要构建每一个 GraphX 并组成一个图。 顶点属性可能包含用户名和职业。 我们可以注明用户和职业之间的关系:
生成的 graph 类型申明:
val userGraph: Graph[(String, String), String]
从原始文件有许多方法来构造一个属性图,抽样,甚至综合的,这些部分在 graph builders 里有更详细地讨论 。 最通用的方法是使用 Graph对象 。 例如下面的代码是从抽样的集合构造图:
// Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") // Build the initial Graph val graph = Graph(users, relationships, defaultUser)
在上面的例子中,我们使用 edge
类。 边 (Edge) 有一个 srcId
和一个 dstId
源和目标顶点对应标识符。 此外, attr是一个
属性。 边缘
我们可以把图分解为相应的顶点和边 graph.vertices
和 graph.edges
val graph: Graph[(String, String), String] // Constructed from above // Count all users which are postdocs graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count // Count all the edges where src > dst graph.edges.filter(e => e.srcId > e.dstId).count
注意
graph.vertices
返回一个VertexRDD[(字符串,字符串)]
延伸((VertexID(String,String)))
所以我们使用 scala解析
元组。 另一方面,graph.edges
返回一个EdgeRDD
包含边缘(字符串)
对象。 我们也可以使用案例类型构造函数如以下:
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
除了属性图的顶点和边,GraphX 也暴露了三个一组的观点。 通过点、边、边属性,这三种属性抽样(EdgeTriplet[VD ED]]
包含的实例 EdgeTriplet
类。 也可以用SQL表达式表示:
SELECT src.id, dst.id, src.attr, e.attr, dst.attr FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst ON e.srcId = src.Id AND e.dstId = dst.Id
或图形:
EdgeTriplet
类通过添加 srcAttr
和 dstAttr
成员扩展了 边缘
类,分别包含源和目标属性。 我们可以使用 三个一组的一个图形来呈现一个字符串集合描述用户之间的关系。
val graph: Graph[(String, String), String] // Constructed from above // Use the triplets view to create an RDD of facts. val facts: RDD[String] = graph.triplets.map(triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1) facts.collect.foreach(println(_))
正如 rdds 有基本的map, filter, reduceByKey
操作 ,操作图的属性也是对集合的操作,改变性质和结构,通过操作用户自定义函数产生新的图形。对图的属性的操作定义为 GraphOps
。 然而,由于值得一提的是Scala 对 GraphOps
会自动提供的成员属性图
。 例如,我们可以 计算每个顶点的入度(在GraphOps中定义)如以下:
val graph: Graph[(String, String), String] // Use the implicit GraphOps.inDegrees operator val inDegrees: VertexRDD[Int] = graph.inDegrees
区分核心图形操作的原因 GraphOps
是 在未来能够支持不同的图形表示。 每个图表示必须 提供核心业务和重用的实现中定义的许多有用的操作 GraphOps
。
下面是 Graph
和 GraphOps的
快速摘要以及 graph的简单成员。请注意,某些功能的签名已被简化(如默认参数和去掉了类型限制)和一些更高级的功能已被删除,请参考正式的API文档操作名单。
/** Summary of the functionality in the property graph */ class Graph[VD, ED] { // Information about the Graph =================================================================== val numEdges: Long val numVertices: Long val inDegrees: VertexRDD[Int] val outDegrees: VertexRDD[Int] val degrees: VertexRDD[Int] // Views of the graph as collections ============================================================= val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] val triplets: RDD[EdgeTriplet[VD, ED]] // Functions for caching graphs ================================================================== def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] def cache(): Graph[VD, ED] def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] // Change the partitioning heuristic ============================================================ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] // Transform vertex and edge attributes ========================================================== def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) : Graph[VD, ED2] // Modify the graph structure ==================================================================== def reverse: Graph[VD, ED] def subgraph( epred: EdgeTriplet[VD,ED] => Boolean = (x => true), vpred: (VertexID, VD) => Boolean = ((v, d) => true)) : Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] // Join RDDs with the graph ====================================================================== def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED] def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)]) (mapFunc: (VertexID, VD, Option[U]) => VD2) : Graph[VD2, ED] // Aggregate information about adjacent triplets ================================================= def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] def aggregateMessages[Msg: ClassTag]( sendMsg: EdgeContext[VD, ED, Msg] => Unit, mergeMsg: (Msg, Msg) => Msg, tripletFields: TripletFields = TripletFields.All) : VertexRDD[A] // Iterative graph-parallel computation ========================================================== def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( vprog: (VertexID, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], mergeMsg: (A, A) => A) : Graph[VD, ED] // Basic graph algorithms ======================================================================== def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] def connectedComponents(): Graph[VertexID, ED] def triangleCount(): Graph[Int, ED] def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] }
类似操作RDD 的 map 一样,graphx 的属性包含以下内容:
class Graph[VD, ED] { def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] }
这里每一个操作产生一个新图,其顶点和边被用户定义的map函数修改了。
注意:在每一个实例图结构不受影响。这是这些操作的关键特征,这允许结果图重复利用原始图的结构索引。下面的代码片段逻辑上是等同的,但是第一个没有保存结构索引,其不会从GraphX系统优化中获益:
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) } val newGraph = Graph(newVertices, graph.edges)
代替,使用mapVertices保护结构索引:
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))这些操作经常用来初始化图为了进行特殊计算或者排除不需要的属性。例如,给定一个图,它的出度作为顶点属性(之后描述如何构建这样一个图),我们初始化它为PageRank:
// Given a graph where the vertex property is the out degree val inputGraph: Graph[Int, String] = graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0)) // Construct a graph where each edge contains the weight // and each vertex is the initial PageRank val outputGraph: Graph[Double, Double] = inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)