// 初始值为 5 的情况
// 获取分区 0 获取最大值为 (5, 1, 2, 3) = 5
// 获取分区 1 获取最大值为 (5, 4, 5, 6) = 6
// 最后的累加结果为 5 + 5 + 6 = 16
// 最后的累加结果必须携带初始值
z.aggregate(5)(math.max(_, _), _ + _)
结果: Int = 16
val z = sc.parallelize(List("a","b","c","d","e","f"),2)
def myfunc(index: Int, iter: Iterator[(String)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
z.mapPartitionsWithIndex(myfunc).collect
结果:Array[String] = Array([partID:0, val: a], [partID:0, val: b], [partID:0, val: c], [partID:1,
val: d], [partID:1, val: e], [partID:1, val: f])
z.aggregate("")(_ + _, _+_)
结果:String = abcdef
z.aggregate("x")(_ + _, _+_)
结果:String = xxdefxabc
val z = sc.parallelize(List("12","23","345","4567"),2)
z.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)
结果:String = 42
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
结果: String = 11
val z = sc.parallelize(List("12","23","345",""),2)
z.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)
结果: String = 10
aggregateByKey(func1,func2)
作用同上,只是对相同的 key 进行操作
val pairRDD = sc.parallelize(List( ("cat",2), ("cat", 5), ("mouse", 4),("cat", 12), ("dog", 12),
("mouse", 2)), 2)
def myfunc(index: Int, iter: Iterator[(String, Int)]) : Iterator[String] = {
iter.toList.map(x => "[partID:" + index + ", val: " + x + "]").iterator
}
pairRDD.mapPartitionsWithIndex(myfunc).collect
结果:Array[String] = Array([partID:0, val: (cat,2)], [partID:0, val: (cat,5)], [partID:0, val: