admin管理员组文章数量:1122852
RDD
目录
一、创建RDD
1、从内存中创建
2、从文件中创建RDD
二、RDD分区与分区数据匹配
1、内存分区的设定
2、内存分区数据的匹配
3、文件分区的设定
4、文件分区数据的匹配
三、RDD转换算子
1、value类型
(1)map,可以进行映射转换,可以是类型,可以是值
(2) mapPartitions 进去迭代器,返回迭代器
(3)mapPartitionsWithIndex 可以获取指定分区的数据
(4)flatMap 扁平化
(5)groupby 分组
(6)filter 过滤
(7)sample 随机抽数,放回与不放回
(8) distinct 去重
(9)coalesce 缩小分区
(10)repartition 扩大分区
(11)sortBy 默认升序排序
2、双value类型
(1)交集、并集、差集、拉链
3、key value类型
(1)partitionBy 将数据重新分配
(2)reduceByKey 相同key进行 value聚合
(3) groupByKey 相同的key的数据分到一个组中,形成一个对偶元组
(4)reduceByKey 和 groupByKey
(5)aggregateByKey 分区内,分区间计算逻辑不一致
(6)foldByKey 分区内与分区间计算逻辑一致,简化写
(7)72 一个小练习,获取相同key的平均值
(8)combineByKey
(9)74 reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
(10) join 和 leftOuterJoin
(11) cogroup 实现分组连接
四、RDD行动算子
1、什么叫行动算子?
2、行动算子
(1)reduce、collect、count、first、take、takeOrdered
(2) aggregate 计算逻辑和 aggregateByKey
(3)fold 当aggregate 分区内与分区间计算逻辑一致时,使用它进行简化
(4)countByValue 统计每个出现的次数
(5)countByKey 统计每个key出现的次数
(6)save相关算子
五、实现wordcou的11种方法
一、创建RDD
1、从内存中创建
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {//准备环境val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)//创建RDD//从内存中创建RDD,将内存中集合的数据作为处理的数据源val seq = Seq[Int](1, 2, 3, 4)//collect 汇总// sc.parallelize(seq).collect().foreach(println)
// makeRDD方法在底层实现就是调用,rdd的对象parallelize 方法 !!!sc.makeRDD(seq).collect().foreach(println)//关闭环境sc.stop()}
}
2、从文件中创建RDD
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)// 从文件中创建RDD,作为处理的数据源// path路径以当前的环境,根路径为基准,可以写绝对路径,也可以写相对路径// 路径可以是具体文件,也可是目录,是目录就会统计目录下所有文件内容// 路径也可是使用通配符 如:Data/Wc*//路径也可是 hdfs路径 如: hdfs://hadoop:9000/test.txtval rdd = sc.textFile("Data/Wcdata.txt")rdd.collect().foreach(println)sc.stop()}
}
二、RDD分区与分区数据匹配
1、内存分区的设定
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")//配置分区conf.set("spark.default.parallelism","4")val sc = new SparkContext(conf)//makeRDD 可以传递第二个参数,表示分区的数量//若不写第二个参数,使用默认值:defaultParallelism// scheduler.conf.getInt("spark.default.parallelism", totalCores)//spark在默认情况下,从配置对象中获取参数 spark.default.parallelism// 如果获取不到,就使用 totalCores 属性,这个属性取值为当前的环境最大可用核数val rdd = sc.makeRDD(List(1, 2, 3, 4),2)//saveAsTextFile将处理的数据保存成分区文件rdd.saveAsTextFile("output")sc.stop()}
}
2、内存分区数据的匹配
3、文件分区的设定
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")//配置分区conf.set("spark.default.parallelism","4")val sc = new SparkContext(conf)// textFile 也可以默认指定分区//minPartitions 最小分区数量//math.min(defaultParallelism, 2)//若不想使用默认的分区数量,可以使用第二个分区,指定分区数//spark 读取文件,底层使用的就是hadoop的读取方式//分区计算方式:/*比如指定两个分区,文件有7个字节(回车等也算)* 文件 totalSize = 7* 指定分区数 par =2* goalSize = 7/2 = 3* 那么每个分区需要存放2 还余 1 ,1占3的百分之30,* 超过hadoop分区百分之10,就会再增加一个分区,所以最后是 3个分区* */val rdd = sc.textFile("Data/Wcdata.txt",3)//saveAsTextFile将处理的数据保存成分区文件rdd.saveAsTextFile("output")sc.stop()}
}
4、文件分区数据的匹配
三、RDD转换算子
1、value类型
(1)map,可以进行映射转换,可以是类型,可以是值
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4))//1 2 3 4 -> 2 4 6 8//1 写法逻辑
// def mapfunction(num:Int): Int = {
// num *2
// }
// rdd.map(mapfunction).collect().foreach(println)//2 简易写法
// rdd.map((num: Int) => {num *2}).collect().foreach(println)//3 将2再次简化
// 当代码逻辑只有一行,花括号可以省略
// rdd.map((num: Int) => num *2).collect().foreach(println)
// 参数类型可以自动推断出来,类型可以省略
// rdd.map((num) => num *2).collect().foreach(println)
// 如果参数列表只有一个,小括号可以省略
// rdd.map(num => num *2).collect().foreach(println)
// 参数在逻辑中只出现一次,并且按照顺序出现,可以使用下划线代替rdd.map(_*2).collect().foreach(println)sc.stop()}
}
(2) mapPartitions 进去迭代器,返回迭代器
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4),2)//1 2 3 4 -> 2 4 6 8// mapPartitions 空间换时间,会将整个分区的数据,加载到内存中,// 然后以分区为单位进行数据加载转换操作,处理完的数据不会被释放,所以注意内存,防止溢出rdd.mapPartitions(num =>{num.map(_*2)}).collect().foreach(println)//要求返回迭代器,求每个分区最大rdd.mapPartitions(num => {List(num.max).iterator}).collect.foreach(println)sc.stop()}
}
(3)mapPartitionsWithIndex 可以获取指定分区的数据
获取第二个分区的数据
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4),2)//1 2 3 4 -> 2 4 6 8//需求只想获取第二个分区数据rdd.mapPartitionsWithIndex((index,iter) => {if (index == 1) {iter}else {//不符合返回一个空Nil.iterator}})sc.stop()}
}
查看数据在哪个分区
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4))//1 2 3 4 -> 2 4 6 8//需求 查看数据在哪个分区rdd.mapPartitionsWithIndex((index,inter) =>{inter.map(num => {(index,num)})}).collect().foreach(println)sc.stop()}
}
(4)flatMap 扁平化
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(List(1, 2), List(3, 4)))
// List(1, 2)
// List(3, 4)
// 将数据拆分成个体rdd.flatMap(list =>{list}).collect().foreach(println)// 拆分 字符串val rdd1 = sc.makeRDD(List("hello spark", "spark hadoop"))rdd1.flatMap(list =>{list}).collect().foreach(println)// 这样打平是每个字母包括空格都打平,要进行分隔rdd1.flatMap(list =>{list.split(" ")}).collect().foreach(println)sc.stop()}
}
模式匹配
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(List(1,2),1,2))// 模式匹配rdd.flatMap( data =>{data match{case list:List[_] => listcase dat => List(dat)}}).collect().foreach(println)sc.stop()}
}
(5)groupby 分组
分组原理
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4),2)//1分组原理//groupBy会将数据源中每一个数据进行分区判断,根据返回的分组key进行分组// 相同的 key值会被放在同一个组中def groupa(num: Int): Int ={
// 取模为keynum %2}rdd.groupBy(groupa).collect().foreach(println)//2sc.stop()}
}
根据单词的首字母进行分组
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List("hello","spark","hello","hive"),2)rdd.groupBy(_.charAt(0)).collect().foreach(println)sc.stop()}
}
groupby 分组和分区没有必然联系
groupby 是一个将数据打乱重新组成的过程,所以有shuffle
(6)filter 过滤
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test3 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4))// 只保留奇数
// rdd.filter(num =>{num%2!=0})rdd.filter(_%2!=0).collect().foreach(println)sc.stop()}
}
当对数据进行筛选过滤后,符合规则的数据留下,不符和的舍去,分区数不变,但这很可能造成分区内数据分布的不均匀,导致数据倾斜
(7)sample 随机抽数,放回与不放回
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd =sc.makeRDD(List(1,2,3,4),1)// 抽取数据不放回(伯努利算法)// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要// 第一个参数:抽取的数据是否放回,false:不放回// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;// 第三个参数:随机数种子val dataRDD1 = rdd.sample(false, 0.5)// 抽取数据放回(泊松算法)// 第一个参数:抽取的数据是否放回,true:放回;false:不放回// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数// 第三个参数:随机数种子val dataRDD2 = rdd.sample(true, 2)sc.stop()}
}
有啥用?
在实际开发过程中,往往会出现数据倾斜的情况。那么可以从数据倾斜的分区中抽取数据,查看数据的规则。分析后,可以进行改善处理,让数据更加均匀。
(8) distinct 去重
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4,1,2),1)// 原理 map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)val rdd1 = rdd.distinct()val rdd2 = rdd.distinct(2)//发现重新分为两个区rdd2.saveAsTextFile("output")sc.stop()}
}
(9)coalesce 缩小分区
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4,1,2),6)//coalesce算子默认情况下无法扩大分区,因为默认情况下不会打乱数据。// 扩大分区是没有意义的,如果就想扩大分区,那么必须使用shuffle,打乱数据。即第二个参数val rdd1 = rdd.coalesce(2)rdd1.saveAsTextFile("output")sc.stop()}
}
(10)repartition 扩大分区
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4,1,2),2)//repartition底层就是coalesce(numPartitions, shuffle = true)val rdd1 = rdd.repartition(4)rdd1.saveAsTextFile("output")sc.stop()}
}
repartition算子其实底层调用的就是coalesce算子,只不过固定使用了shuffle的操作,可以让数据更均衡一下,可以有效防止数据倾斜问题。
如果缩减分区,一般就采用coalesce,如果想扩大分区,就采用repartition
(11)sortBy 默认升序排序
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,6,4,1,2),2)//默认升序,不改变分区,有shufflerdd.sortBy(num => num).collect().foreach(println)//第二个参数 false 降序rdd.sortBy(num => num,false).collect().foreach(println)sc.stop()}
}
2、双value类型
(1)交集、并集、差集、拉链
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test4 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd1 = sc.makeRDD(List(1, 2, 3, 4))val rdd2 = sc.makeRDD(List(3,4,5,6))//交集println(rdd1.intersection(rdd2).collect().mkString(","))//并集println(rdd1.union(rdd2).collect().mkString(","))//差集 站在rdd1的角度上去看和rdd2的差集 【1,2】println(rdd1.subtract(rdd2).collect().mkString(","))//拉链 预想结果 13 24 35 46println(rdd1.zip(rdd2).collect().mkString(","))sc.stop()}
}
如果两个RDD数据类型不一致怎么办?会出错
交集并集和差集要求两个数据源数据类型保持一致
拉链操作,两个数据源类型可以不一致
但是拉链操作要求我们的两个数据源的分区数量要保持一致,并且还要保持两个数据源分区中的数据要保持一致
3、key value类型
(1)partitionBy 将数据重新分配
package com.testimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}object Test5 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4))//partitionBy 将数据进行重分配 使用规则HashPartitionerval newrdd = rdd.map(num => {(num, 1)}).partitionBy(new HashPartitioner(2))newrdd.saveAsTextFile("output")sc.stop()}
}
(2)reduceByKey 相同key进行 value聚合
package com.testimport org.apache.spark.{SparkConf, SparkContext}object reduceBykey {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 1), ("c", 3)))//reduceByKey//相同的key进行value数据的聚合(两两聚合)//如果key的数据只有一个,是不会参与运算的!!!直接返回rdd.reduceByKey((x,y) =>(x+y)).collect().foreach(println)sc.stop()}
}
(3) groupByKey 相同的key的数据分到一个组中,形成一个对偶元组
package com.testimport org.apache.spark.{SparkConf, SparkContext}object groupBykey {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 1), ("c", 3)))//groupByKey//将数据源中的数据,相同的key的数据分到一个组中,形成一个对偶元组//元组中第一个元素 是 key//元组中第二个元素 是 相同key的value 的集合rdd.groupByKey().collect().foreach(println)println("-----------------------------------")rdd.groupBy(_._1).collect().foreach(println)sc.stop()}
}
结果:
(a,CompactBuffer(1, 2))
(b,CompactBuffer(1))
(c,CompactBuffer(3))
-----------------------------------
(a,CompactBuffer((a,1), (a,2)))
(b,CompactBuffer((b,1)))
(c,CompactBuffer((c,3)))
(4)reduceByKey 和 groupByKey
groupByKey大致原理
reduceByKey 大致原理(支持分区内预聚合,可以有效减少shuffle时的数据量,提升shuffle的性能 )
总结:
两个算子没有使用上的区别。所以使用的时候需要根据应用场景来选择。
从性能上考虑,reduceByKey存在预聚合功能,这样,在shuffle的过程中,落盘的数据量会变少,所以读写磁盘的速度会变快。性能更高
(5)aggregateByKey 分区内,分区间计算逻辑不一致
reduceByKey 支持分区内预聚合 可以有效减少shuffle落盘数据量
但是 这要求我们 使用reduceByKey 时分区内与分区间的计算规则要一样,例如统计wc
相同的key 两两之间聚合
但是当遇到计算逻辑不一致时,比如分区内求最大值,分区间求和,此时reduceByKey 就不太能满足要求。
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)),2)//第一个参数列表//需要传递一个参数,表示为初始值// 主要用于当碰见第一个key的时候,和value进行分区内的计算//第二个参数列表//需要两个参数,第一个参数 表示分区内计算规则// 第二个参数 表示分区间计算规则rdd.aggregateByKey(0)((x,y) => math.max(x,y),(x,y) => x +y).foreach(println)sc.stop()}
}
(6)foldByKey 分区内与分区间计算逻辑一致,简化写
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("a", 4)),2)// rdd.aggregateByKey(0)(_+_,_+_).collect().foreach(println)//如果聚合计算时,分区内和分区间计算逻辑一致,spark提供了简化方法rdd.foldByKey(0)(_+_).collect().foreach(println)sc.stop()}
}
(7)72 一个小练习,获取相同key的平均值
(8)combineByKey
一个分区下:
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test10 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6)),1)rdd.mapPartitionsWithIndex((index,partition) => {println("----------------"+index)partition.map(x => s"${index},${x}")}).foreach(println)val res = rddbineByKey((v:Int) => v + "_" , //初始化(c:String,v:Int) => c + "@" + v , //同一分区内计算(c1:String,c2:String) => c1 + "$" + c2 , //跨分区合并)println(res.collect().mkString(","))sc.stop()}
}
打印结果为:
----------------0
0,(a,1)
0,(a,2)
0,(a,3)
0,(b,4)
0,(b,5)
0,(c,6)
(a,1_@2@3),(b,4_@5),(c,6_)
三个分区下:
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test10 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4), ("b", 5), ("c", 6)),3)rdd.mapPartitionsWithIndex((index,partition) => {println("----------------"+index)partition.map(x => s"${index},${x}")}).foreach(println)val res = rddbineByKey((v:Int) => v + "_" , //初始化(c:String,v:Int) => c + "@" + v , //同一分区内计算(c1:String,c2:String) => c1 + "$" + c2 , //跨分区合并)println(res.collect().mkString(","))sc.stop()}
}
打印结果为:
----------------1
----------------0
----------------2
2,(b,5)
1,(a,3)
0,(a,1)
1,(b,4)
2,(c,6)
0,(a,2)
(c,6_),(a,1_@2$3_),(b,4_$5_)
(9)74 reduceByKey、foldByKey、aggregateByKey、combineByKey的区别?
从源码的角度来讲,四个算子的底层逻辑是相同的。
aggregateByKey的算子会将初始值和第一个value使用分区内的计算规则进行计算
foldByKey的算子的分区内和分区间的计算规则相同,并且初始值和第一个value使用的规则相同
combineByKey第一个参数就是对第一个value进行处理,所以无需初始值。
reduceByKey不会对第一个value进行处理,分区内和分区间计算规则相同
上面的四个算子都支持预聚合功能。所以shuffle性能比较高
上面的四个算子都可以实现WordCount
(10) join 和 leftOuterJoin
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("d",0),("a",22)))val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))// join// 两个不同数据源的数据,相同key的的value会连接在一起,形成元组// 类似inner joinrdd1.join(rdd2).collect().foreach(println)println("-------------------------------------------------")//leftOuterJoinrdd1.leftOuterJoin(rdd2).collect().foreach(println)sc.stop()}
}
打印结果:
(a,(1,4))
(a,(22,4))
(b,(2,5))
(c,(3,6))
-------------------------------------------------
(a,(1,Some(4)))
(a,(22,Some(4)))
(b,(2,Some(5)))
(c,(3,Some(6)))
(d,(0,None))
(11) cogroup 实现分组连接
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd1 = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3),("d",0),("a",22)))val rdd2 = sc.makeRDD(List(("a", 4), ("b", 5), ("c", 6)))// cogroup : connect + group//rdd1.cogroup(rdd2).collect().foreach(println)sc.stop()}
}
打印结果:
(a,(CompactBuffer(1, 22),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer(6)))
(d,(CompactBuffer(0),CompactBuffer()))
四、RDD行动算子
1、什么叫行动算子?
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4))//行动算子,其实就是触发作业(job)执行的方法//底层代码调用的是环境对象的 runJob 方法//底层代码会创建ActiveJob,并且提交执行rdd.collect()sc.stop()}
}
2、行动算子
(1)reduce、collect、count、first、take、takeOrdered
package com.testimport org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1, 2, 3, 4))//println(rdd.reduce(_+_))// collect :会将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组rdd.collect().foreach(println)// count: 获取数据源中数据的个数println(rdd.count())//first :获取数据源中数据的第一个print(rdd.first())//take : 获取数据中的 N 个数据rdd.take(3).foreach(println)//takeOrdered:数据排序后,取N个数据rdd.takeOrdered(3).foreach(println)sc.stop()}
}
(2) aggregate 计算逻辑和 aggregateByKey
区别,它的初始值会既参与分区内计算,也参与分区间计算
package com.peizk.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4),2)println(rdd.aggregate(10)(_ + _, _ + _))sc.stop()}
}
(3)fold 当aggregate 分区内与分区间计算逻辑一致时,使用它进行简化
package com.peizk.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,3,4),2)println(rdd.fold(10)(_ + _))sc.stop()}
}
(4)countByValue 统计每个出现的次数
package com.peizk.testimport org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(1,2,1,2,3,4,4,4,4),2)//countByValue,可以统计出每个值出现的次数println(rdd.countByValue())sc.stop()}
}
打印结果:
Map(4 -> 4, 2 -> 2, 1 -> 2, 3 -> 1)
(5)countByKey 统计每个key出现的次数
package com.peizk.testimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))// 统计每种key的个数println(rdd.countByKey())sc.stop()}
}
(6)save相关算子
package com.peizk.testimport org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object Test2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local").setAppName("My HdfsApp")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))rdd.saveAsTextFile("output1")rdd.saveAsObjectFile("output2")//saveAsSequenceFile 方法要求数据的格式必须为 K -V 类型rdd.saveAsSequenceFile("output3")sc.stop()}
}
五、实现wordcou的11种方法
package com.testimport org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject Test {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("my app")val sc = new SparkContext(conf)wordcount1(sc)sc.stop()}//1 group bydef wordcount1(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val group = word.groupBy(word => word)val worcount = group.mapValues(iter => iter.size)}// 2 groupByKey 要有k v类型 效率不高 走shuffledef wordcount2(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val wordone = word.map((_, 1))val group = wordone.groupByKey()val worcount = group.mapValues(iter => iter.size)}// 3 reduceByKey 较groupByKey 效率更好点def wordcount3(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val wordone = word.map((_, 1))val worcount = wordone.reduceByKey(_+_)}// 4 aggregateByKeydef wordcount4(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val wordone = word.map((_, 1))val worcount = wordone.aggregateByKey(0)(_+_,_+_)}// 5 foldByKey 当aggregateByKey分区内分区外规则一样时def wordcount5(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val wordone = word.map((_, 1))val worcount = wordone.foldByKey(0)(_+_)}// 6 combineByKey 需要传三个参数def wordcount6(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val wordone = word.map((_, 1))val worcount = wordonebineByKey(v=>v,(x:Int,y)=>x+y,(x:Int,y:Int)=>x+y)}// 7 countByKeydef wordcount7(sc: SparkContext): Unit = {val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val wordone = word.map((_, 1))val worcount = wordone.countByKey()}// 8 countByValuedef wordcount8(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val worcount = word.countByValue()}// 9 reduce,aggregate,folddef wordcount9(sc: SparkContext): Unit ={val rdd = sc.makeRDD(List("Hello spark", "Hello scala"))val word = rdd.flatMap(_.split(" "))val mapword = word.map(word => {mutable.Map[String, Long]((word, 1))})val wordcount = mapword.reduce((map1, map2) => {map1})}}
本文标签: rdd
版权声明:本文标题:RDD 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/biancheng/1710478039a763443.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论