spark系列——RDD入门

/ Spark / 没有评论 / 46浏览

resilient distributed dataset (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel.

简单点,RDD就是spark用来访问节点中的数据的一种抽象。

wordcount程序

我们先看下wordcount程序:

/**
 * word count程序
 *
 * bash:
 * bin/spark-submit --class "com.example.spark.rdd.WordCountApp" spark-example-1.0-SNAPSHOT.jar /word.txt /word-output
 *
 * @author xuan
 * @since 1.0.0
 */
public class WordCountApp {

    private static final Logger LOG = LoggerFactory.getLogger(WordCountApp.class);

    public static void main(String[] args) {
        // The first thing a Spark program must do is to create a JavaSparkContext object
        // which tells Spark how to access a cluster.
        SparkConf conf = new SparkConf()
                .setAppName(WordCountApp.class.getSimpleName());
        JavaSparkContext sc = new JavaSparkContext(conf);

        //
        String readPath = args[0];
        String savePath = args[1];
        LOG.info("wordcount read file path: {}", readPath);
        LOG.info("wordcount save file path: {}", savePath);

        sc.textFile(readPath)
                .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
                .mapToPair(word -> new Tuple2<>(word, 1))
                .reduceByKey((a, b) -> a + b)
                .map(tuple -> tuple._1 + ": " + tuple._2)
                .saveAsTextFile(savePath);
    }

}

当创建好JavaSparkContext对象后,通过textFile()创建Text file RDDs对象,然后调用相关api并行计算。

RDD操作

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

如spark英文文档中介绍,RDDs支持两种操作:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

惰性计算

对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29

//行动操作,触发一次真正从头到尾的计算
scala> println(rdd.count())
3

//行动操作,触发一次真正从头到尾的计算
scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive

上面代码执行过程中,前后共触发了两次从头到尾的计算。

持久化

在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

为了解决从头计算,我们可以缓存结果,下次行动操作就不需要从到计算了。

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29

//会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
scala> rdd.cache()

//第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
scala> println(rdd.count())
3

//第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive

最后,可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。

persist()的圆括号中包含的是持久化级别参数,比如,persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。

一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)

官网rdd-persistence

分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。 对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:

因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:

scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)

//设置两个分区
scala>val rdd = sc.parallelize(array,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29

对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。 如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。

RDDs Api

推荐

spark2.1.0入门之RDD编程