zip
def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]
zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 5,2) var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2) var rdd3 = rdd1.zip(rdd2) println("RDD partitions size:" + rdd3.partitions.size) rdd3.collect.foreach(println(_))}
16/12/20 11:30:42 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:25, took 0.680351 s
(1,A) (2,B) (3,C) (4,D) (5,E) 16/12/20 11:30:42 INFO SparkContext: Invoking stop() from shutdown hook
zipPartitions
zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。
该函数有好几种实现,可分为三类:
- 参数是一个RDD
def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]
这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息
映射方法f参数为两个RDD的迭代器。
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 10, 2) var rdd2 = sc.makeRDD(Array('A', 'B', 'C', 'D', 'E'), 2) var rdd3 = rdd1.zipPartitions(rdd2) { (rdd1Iter, rdd2Iter) => { var result = List[String]() while (rdd1Iter.hasNext && rdd2Iter.hasNext) { result ::= (rdd1Iter.next() + "_" + rdd2Iter.next()) } result.iterator } }.collect.foreach(println(_))}
16/12/20 15:02:55 INFO DAGScheduler: ResultStage 0 (collect at ShellTest.scala:31) finished in 0.239 s
16/12/20 15:02:55 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:31, took 0.699322 s2_B 1_A 8_E 7_D 6_C 16/12/20 15:02:55 INFO SparkContext: Invoking stop() from shutdown hook
- 参数是两个RDD
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]
用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。
def main(args: Array[String]): Unit = { //默认分区12个 val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12")) var rdd1 = sc.makeRDD(1 to 10, 2) var rdd2 = sc.makeRDD(Array('A', 'B', 'C', 'D', 'E'), 2) var rdd3 = sc.makeRDD(Array('a', 'b', 'c', 'd', 'e'), 2) var rdd4 = rdd1.zipPartitions(rdd2, rdd3) { (rdd1Iter, rdd2Iter, rdd3Iter) => { var result = List[String]() while (rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) { result ::= (rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next()) } result.iterator } }.collect.foreach(println(_))}
16/12/20 15:06:21 INFO DAGScheduler: ResultStage 0 (collect at ShellTest.scala:32) finished in 0.260 s
16/12/20 15:06:21 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:32, took 0.816710 s2_B_b 1_A_a 8_E_e 7_D_d 6_C_c 16/12/20 15:06:21 INFO SparkContext: Invoking stop() from shutdown hook- 参数是三个RDD
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]
用法同上面,只不过这里又多了个一个RDD而已。