spark graphx写出二跳邻居以下求解过程的代码求解过程 先构造一个属性图,每个顶点的属性 Attr 为 Map(dstId->distance),初始化为 Map( 该顶点 ID->0) 。然后进行两次迭代求解二度关系. 第一次迭代 : 遍历每条边,将 dst 顶点属性 dstAttr 中的跳数字段标记为 1 发给 src 顶点, src 收到后合并到顶点属性 srcAttr 里. 第二次迭代 : 遍历边筛选出 dstAttr 里面跳数为 1 的 Key-Value 发给对应的 src 顶点,并将 dstId 加入桥梁顶点,最后聚合这些消息得到所有 2 跳邻居。
时间: 2024-03-05 13:54:44 浏览: 19
以下是用 Spark GraphX 求解二跳邻居的代码,包含了对应的求解过程:
```scala
import org.apache.spark.graphx._
// 构造图
val vertices: RDD[(VertexId, Map[VertexId, Double])] = ???
val edges: RDD[Edge[Int]] = ???
val defaultVertexAttr: Map[VertexId, Double] = Map.empty[VertexId, Double].withDefaultValue(Double.MaxValue)
val graph = Graph(vertices, edges, defaultVertexAttr)
// 第一次迭代
val firstIterGraph = graph.mapTriplets(triplet => {
// 将 dst 顶点属性中跳数字段标记为 1,发给 src 顶点
Iterator((triplet.srcId, Map(triplet.dstId -> 1.0)))
}).aggregateMessages[Map[VertexId, Double]](
// 将收到的跳数为 1 的顶点属性合并到 src 顶点属性中
triplet => Iterator((triplet.srcId, triplet.attr)),
(a, b) => a ++ b
).joinVertices(graph.vertices) {
// 将收到的跳数为 1 的顶点属性合并到顶点属性中
(_, attr, srcAttr) => srcAttr ++ attr
}
// 第二次迭代
val secondIterGraph = firstIterGraph.aggregateMessages[Map[VertexId, Double]](
triplet => {
// 筛选出 dstAttr 中跳数为 1 的 Key-Value,发给对应的 src 顶点
if (triplet.attr.getOrElse(triplet.dstId, Double.MaxValue) == 1.0) {
Iterator((triplet.srcId, Map(triplet.dstId -> 2.0)), (triplet.dstId, Map(triplet.srcId -> 2.0)))
} else {
Iterator.empty
}
},
(a, b) => {
// 聚合这些消息得到所有 2 跳邻居
a ++ b
}
).joinVertices(firstIterGraph.vertices) {
// 将桥梁顶点加入顶点属性中
(_, attr, srcAttr) => srcAttr ++ attr
}
// 输出结果
secondIterGraph.vertices.foreach(println)
```
求解过程如下:
1. 初始化每个顶点的属性为 Map(id -> 0.0),即距离为 0。
2. 第一次迭代:遍历每条边,将 dst 顶点属性中跳数字段标记为 1 发给 src 顶点,src 收到后合并到顶点属性 srcAttr 里。
3. 第二次迭代:遍历边,筛选出 dstAttr 里面跳数为 1 的 Key-Value 发给对应的 src 顶点,并将 dstId 加入桥梁顶点,最后聚合这些消息得到所有 2 跳邻居。