map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素。
输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区。
hadoop fs -cat /tmp/lxw1234/1.txthello worldhello sparkhello hive //读取HDFS文件到RDDscala> var data = sc.textFile("/tmp/lxw1234/1.txt")data: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at :21 //使用map算子scala> var mapresult = data.map(line => line.split("\\s+"))mapresult: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at :23 //运算map算子结果scala> mapresult.collectres0: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive))flatMap
属于Transformation算子,第一步和map一样,最后将所有的输出分区合并成一个。
/使用flatMap算子scala> var flatmapresult = data.flatMap(line => line.split("\\s+"))flatmapresult: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at flatMap at :23 //运算flagMap算子结果scala> flatmapresult.collectres1: Array[String] = Array(hello, world, hello, spark, hello, hive)使用flatMap时候需要注意:
flatMap会将字符串看成是一个字符数组。
看下面的例子:
再看:
scala> data.map(x => x.split("\\s+")).collectres34: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, hive), Array(hi, spark)) scala> data.flatMap(x => x.split("\\s+")).collectres35: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark)这次的结果好像是预期的,最终结果里面并没有把字符串当成字符数组。
这是因为这次map函数中返回的类型为Array[String],并不是String。
flatMap只会将String扁平化成字符数组,并不会把Array[String]也扁平化成字符数组。
distinct
对RDD中的元素进行去重操作。
scala> data.flatMap(line => line.split("\\s+")).collectres61: Array[String] = Array(hello, world, hello, spark, hello, hive, hi, spark) scala> data.flatMap(line => line.split("\\s+")).distinct.collectres62: Array[String] = Array(hive, hello, world, spark, hi)