博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark算子:RDD基本转换操作(6)–zip、zipPartitions
阅读量:6428 次
发布时间:2019-06-23

本文共 4080 字,大约阅读时间需要 13 分钟。

hot3.png

zip

def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

def main(args: Array[String]): Unit = {  //默认分区12个  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))  var rdd1 = sc.makeRDD(1 to 5,2)  var rdd2 = sc.makeRDD(Array('A','B','C','D','E'),2)  var rdd3 = rdd1.zip(rdd2)  println("RDD partitions size:" + rdd3.partitions.size)  rdd3.collect.foreach(println(_))}

 

16/12/20 11:30:42 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:25, took 0.680351 s

(1,A)
(2,B)
(3,C)
(4,D)
(5,E)
16/12/20 11:30:42 INFO SparkContext: Invoking stop() from shutdown hook

 

zipPartitions

zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

该函数有好几种实现,可分为三类:

  • 参数是一个RDD

def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

映射方法f参数为两个RDD的迭代器。

def main(args: Array[String]): Unit = {  //默认分区12个  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))  var rdd1 = sc.makeRDD(1 to 10, 2)  var rdd2 = sc.makeRDD(Array('A', 'B', 'C', 'D', 'E'), 2)  var rdd3 = rdd1.zipPartitions(rdd2) {    (rdd1Iter, rdd2Iter) => {      var result = List[String]()      while (rdd1Iter.hasNext && rdd2Iter.hasNext) {        result ::= (rdd1Iter.next() + "_" + rdd2Iter.next())      }      result.iterator    }  }.collect.foreach(println(_))}

16/12/20 15:02:55 INFO DAGScheduler: ResultStage 0 (collect at ShellTest.scala:31) finished in 0.239 s

16/12/20 15:02:55 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:31, took 0.699322 s
2_B
1_A
8_E
7_D
6_C
16/12/20 15:02:55 INFO SparkContext: Invoking stop() from shutdown hook

 

  • 参数是两个RDD

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

def main(args: Array[String]): Unit = {  //默认分区12个  val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test").set("spark.default.parallelism", "12"))  var rdd1 = sc.makeRDD(1 to 10, 2)  var rdd2 = sc.makeRDD(Array('A', 'B', 'C', 'D', 'E'), 2)  var rdd3 = sc.makeRDD(Array('a', 'b', 'c', 'd', 'e'), 2)  var rdd4 = rdd1.zipPartitions(rdd2, rdd3) {    (rdd1Iter, rdd2Iter, rdd3Iter) => {      var result = List[String]()      while (rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {        result ::= (rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())      }      result.iterator    }  }.collect.foreach(println(_))}

 

16/12/20 15:06:21 INFO DAGScheduler: ResultStage 0 (collect at ShellTest.scala:32) finished in 0.260 s

16/12/20 15:06:21 INFO DAGScheduler: Job 0 finished: collect at ShellTest.scala:32, took 0.816710 s
2_B_b
1_A_a
8_E_e
7_D_d
6_C_c
16/12/20 15:06:21 INFO SparkContext: Invoking stop() from shutdown hook

  • 参数是三个RDD

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

 

用法同上面,只不过这里又多了个一个RDD而已。

转载于:https://my.oschina.net/chensanti234/blog/808861

你可能感兴趣的文章
SAP QM Batch to Batch的转移过账事务中的Vendor Batch
查看>>
本期最新 9 篇论文,帮你完美解决「读什么」的问题 | PaperDaily #19
查看>>
图解SSIS监视文件夹并自动导入数据
查看>>
Lucene.Net 2.3.1开发介绍 —— 四、搜索(一)
查看>>
MyBatis Review——开发Dao的方法
查看>>
技术研发国产化进程加快 看传感器企业如何展示十八般武艺
查看>>
技术助力第三次革命
查看>>
《HTML与CSS入门经典(第8版)》——2.6 总结
查看>>
新手指南:在 Ubuntu 和 Fedora 上安装软件包
查看>>
在 CentOS7.0 上搭建 Chroot 的 Bind DNS 服务器
查看>>
大型网站的 HTTPS 实践(二):HTTPS 对性能的影响
查看>>
《Swift 权威指南》——第6章,第6.10节嵌套函数
查看>>
《自己动手做交互系统》——1.3 本章小结
查看>>
Mobile devices bundled with malware?
查看>>
《JavaScript面向对象精要》——1.5 访问属性
查看>>
《Python数据可视化编程实战》—— 第 1 章 准备工作环境
查看>>
Android应用性能优化最佳实践.1.1 Android Studio的优势
查看>>
《设计模式解析(第2版•修订版)》—第2章 2.2节什么是UML
查看>>
【直播】APP全量混淆和瘦身技术揭秘
查看>>
10个大坑,当你产品上架AppStore会遇到
查看>>