spark graphx写出二跳邻居以下求解过程的代码求解过程 先构造一个属性图,每个顶点的属性 Attr 为 Map(dstId->distance),初始化为 Map( 该顶点 ID->0) 。然后进行两次迭代求解二度关系. 第一次迭代 : 遍历每条边,将 dst 顶点属性 dstAttr 中的跳数字段标记为 1 发给 src 顶点, src 收到后合并到顶点属性 srcAttr 里. 第二次迭代 : 遍历边筛选出 dstAttr 里面跳数为 1 的 Key-Value 发给对应的 src 顶点,并将 dstId 加入桥梁顶点,最后聚合这些消息得到所有 2 跳邻居。
时间: 2024-03-05 11:54:22 浏览: 63
下面是Spark GraphX实现二跳邻居的代码:
```scala
import org.apache.spark.graphx._
val graph: Graph[Double, Int] = ...
// 初始化每个顶点的属性
val initialGraph = graph.mapVertices((id, _) => Map(id -> 0.0))
// 第一次迭代
val firstIterGraph = initialGraph.mapTriplets(triplet => {
// 将dst顶点属性中跳数字段标记为1,发给src顶点
Iterator((triplet.srcId, Map(triplet.dstId -> 1.0)))
})
// 第二次迭代
val secondIterGraph = firstIterGraph.aggregateMessages[Map[VertexId, Double]](
triplet => {
// 筛选出dstAttr里跳数为1的Key-Value,发给对应的src顶点
if (triplet.attr.getOrElse(triplet.dstId, Double.MaxValue) == 1.0) {
Iterator((triplet.srcId, Map(triplet.dstId -> 2.0)), (triplet.dstId, Map(triplet.srcId -> 2.0)))
} else {
Iterator.empty
}
},
(a, b) => {
// 聚合这些消息得到所有2跳邻居
a ++ b
}
)
// 输出结果
secondIterGraph.vertices.foreach(println)
```
这段代码的具体实现过程如下:
首先,我们定义了一个属性图 `graph`,每个顶点的属性 `Attr` 为 `Map(dstId->distance)`,初始化为 `Map(该顶点ID->0)`。
然后,我们进行两次迭代求解二度关系。第一次迭代:遍历每条边,将 `dst` 顶点属性 `dstAttr` 中的跳数字段标记为1发给 `src` 顶点,`src` 收到后合并到顶点属性 `srcAttr` 里。第二次迭代:遍历边筛选出 `dstAttr` 里面跳数为 1 的 Key-Value 发给对应的 `src` 顶点,并将 `dstId` 加入桥梁顶点,最后聚合这些消息得到所有 2 跳邻居。
最后,我们输出计算出来的二跳邻居结果。
阅读全文