图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。
一系列的图并发(graph-parallel
)抽象已经被提出来用来表达这些迭代算法。GraphX
公开了一个类似Pregel
的操作,它是广泛使用的Pregel
和GraphLab
抽象的一个融合。
GraphX
中实现的这个更高级的Pregel
操作是一个约束到图拓扑的批量同步(bulk-synchronous
)并行消息抽象。Pregel
操作者执行一系列的超步(super steps
),在这些步骤中,顶点从
之前的超步中接收进入(inbound
)消息的总和,为顶点属性计算一个新的值,然后在以后的超步中发送消息到邻居顶点。不像Pregel
而更像GraphLab
,消息通过边triplet
的一个函数被并行计算,
消息的计算既会访问源顶点特征也会访问目的顶点特征。在超步中,没有收到消息的顶点会被跳过。当没有消息遗留时,Pregel
操作停止迭代并返回最终的图。
注意,与标准的Pregel
实现不同的是,GraphX
中的顶点仅仅能发送信息给邻居顶点,并且可以利用用户自定义的消息函数并行地构造消息。这些限制允许对GraphX
进行额外的优化。
下面的代码是pregel
的具体实现。
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 计算消息
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
// 迭代
var prevG: Graph[VD, ED] = null
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// 接收消息并更新顶点
prevG = g
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// 发送新消息
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
Pregel
计算模型中有三个重要的函数,分别是vertexProgram
、sendMessage
和messageCombiner
。
-
vertexProgram
:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值。 -
sendMsg
:发送消息 -
mergeMsg
:合并消息
我们具体分析它的实现。根据代码可以知道,这个实现是一个迭代的过程。在开始迭代之前,先完成一些初始化操作:
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
// 计算消息
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var activeMessages = messages.count()
程序首先用vprog
函数处理图中所有的顶点,生成新的图。然后用生成的图调用聚合操作(mapReduceTriplets
,实际的实现是我们前面章节讲到的aggregateMessagesWithActiveSet
函数)获取聚合后的消息。
activeMessages
指messages
这个VertexRDD
中的顶点数。
下面就开始迭代操作了。在迭代内部,分为二步。
- 1 接收消息,并更新顶点
g = g.joinVertices(messages)(vprog).cache()
//joinVertices的定义
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
graph.outerJoinVertices(table)(uf)
}
这一步实际上是使用outerJoinVertices
来更新顶点属性。outerJoinVertices
在关联操作中有详细介绍。
- 2 发送新消息
messages = g.mapReduceTriplets(
sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
注意,在上面的代码中,mapReduceTriplets
多了一个参数Some((oldMessages, activeDirection))
。这个参数的作用是:它使我们在发送新的消息时,会忽略掉那些两端都没有接收到消息的边,减少计算量。
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// 初始化图
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
上面的例子中,Vertex Program
函数定义如下:
(id, dist, newDist) => math.min(dist, newDist)
这个函数的定义显而易见,当两个消息来的时候,取它们当中路径的最小值。同理Merge Message
函数也是同样的含义。
Send Message
函数中,会首先比较triplet.srcAttr + triplet.attr
和triplet.dstAttr
,即比较加上边的属性后,这个值是否小于目的节点的属性,如果小于,则发送消息到目的顶点。
【1】spark源码