SpatialSpark源码阅读

构建spatialRDD

基于key-value形式的RDD生成SpatialRDD

类SpatialRDD继承自spark-core的RDD
spatialRDD.png
伴生对象SpatialRDD调用updatable函数对k-v RDD进行构造。其中对重复key进行了任意合并,具体怎么实现呢???定义了函数z和f,而函数z和f在elems上的应用则是在再次调用的updatable中定义,该现象为scala高阶函数2中的一个函数作为另一个函数的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Constructs an updatable IndexedRDD from an RDD of pairs, merging duplicate keys arbitrarily. 任意合并重复key怎么体现???
*/
def updatable[K: ClassTag, V: ClassTag](elems: RDD[(K, V)]): SpatialRDD[K, V] = updatable[K, V, V](elems, z = (id, a) => a, f = (id, a, b) => b)
//定义了函数z和f,而函数z和f在elems上的应用则是在updatable中定义,该现象为高阶函数中的一个函数作为另一个函数的参数

/**
* Constructs an SpatialRDD from an RDD of pairs.
* the default partitioner is the quadtree based partioner
*/
def updatable[K: ClassTag, U: ClassTag, V: ClassTag](elems: RDD[(K, V)], z: (K, U) => V, f: (K, V, U) => V): SpatialRDD[K, V] = {
val elemsPartitioned = elems.partitionBy(new QtreePartitioner(Util.numPartition, Util.sampleRatio, elems))
val partitions = elemsPartitioned.mapPartitions[SpatialRDDPartition[K, V]](
iter => Iterator(RtreePartition(iter, z, f)),
preservesPartitioning = true)
new SpatialRDD(partitions)
}

在对key-valueRDD进行数据切割的时候,重新定义了一个分区函数QtreePartitioner,类似于spark原生的HashPatitionner(哈希分区)和RangePatitioner(区域分区),既决定了RDD本身的分区数量,也可以作为其父RDD Shuffle输出(MapOutput)中每个分区进行数据切割的依据。

新的分区函数QtreePartitioner

Spark内部提供了HashPartitionerRangePartitioner两种分区策略,这两种分区策略在很多情况下都适合我们的场景。但是有些情况下,Spark内部不能符合需求,这时候就可以自定义分区策略。为此,Spark提供了相应的接口,我们只需要扩展Partitioner抽象类,然后实现里面的三个方法:

1
2
3
4
5
6
7
8
9
10
package org.apache.spark

/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

def numPartitions: Int:这个方法需要返回你想要创建分区的个数;
def getPartition(key: Any): Int:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1分区ID相同的数据元素将被分配到同一个数据分片中
equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样。Spark自定义分区(Partitioner)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
val quadtree: QtreeForPartion = {

val total = rdd.count()

// 不同数据量的采样比例不同
val fraction2 = if (total * fraction > 5e5) (5e5 / total).toFloat else fraction

//对key值进行采样:true 泊松采样,有放回抽样; false 伯努利采样,无放回抽样;fraction 抽样比例
var sampleData = rdd.map(_._1).sample(false, fraction2).collect()

//in case the sample data size is too small,expand the sample ratio 50 times.
if (sampleData.length < 10000) {
sampleData = rdd.map(_._1).sample(false, 0.2).collect()
}

//整数除法:每个数据分片分配的抽样数据个数,即叶子节点的大小,是否每个数据分片都能够有数据
var leafBound = sampleData.length / partitions

if (leafBound == 0) {
leafBound = qtreeUtil.leafbound
}

val qtree = new QtreeForPartion(leafBound)

// 为什么只有采样数据???
sampleData.foreach {
case p: Point =>
qtree.insertPoint(p)

case _ => println("do not support this data type")
}

realnumPartitions = qtree.computePIDofLeaf(sampleData.length, partitions)
//println("bound "+leafbound)
//qtree.printTreeStructure()

qtree
}
Spark自定义分区(Partitioner). https://www.iteblog.com/archives/1368.html