0%

Spark之RDD的理解

Spark操作数据,数据是以什么样的形式存在的?Spark是如何操作数据的呢?

RDD引入

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。

  • Dataset:一个数据集,简单的理解为集合,用于存放数据的
  • Distributed:它的数据是分布式存储,并且可以做分布式的计算
  • Resilient:弹性的
    • 它表示的是数据可以保存在磁盘,也可以保存在内存中
    • 数据分布式也是弹性的
    • 弹性:并不是指他可以动态扩展,而是容错机制
      • RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上
      • spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。
      • spark计算结束,一般会把数据做持久化到hive,hbase,hdfs等等。我们就拿hdfs举例,将RDD持久化到hdfs上,RDD的每个partition就会存成一个文件,如果文件小于128M,就可以理解为一个partition对应hdfs的一个block。反之,如果大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。
  • 不可变
    • 为了做并行计算,且保证对临界资源访问的数据安全,将数据集做成不可变是比较好的实现方式。
    • 同时,不可变可以保证数据计算出问题时,从上一个节点开始重新计算,不必从出发点重新计算。如 rdd0 -> rdd1 -> rdd2, 当 rdd1->rdd2,计算异常时,只需要重新计算rdd1->rdd2该环节即可。
  • 可分区
    • 将RDD数据切分成多个partition容错存储。
  • 并行计算

RDD的创建

第一步 创建sparkContext

SparkContext是Spark程序的入口,代表了和Spark集群的链接。在Spark集群中通过SparkContext来创建RDD。

在创建sparkContext需要一个SparkConf, 用来保存Spark应用的连接信息。

1
2
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

parrellelize方式

  • 进入pyspark环境,可以在spark UI中看到当前的Spark作业 在浏览器访问当前centos的4040端口

    image-20190909153011038

    可以在spark UI中看到当前的Spark作业 在浏览器访问当前的4040端口。

    image-20190909153034238

    实例如下:

    image-20190909153640092

    同时,也可以访问spark UI,查看运行情况。

    1
    2
    3
    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data,5)
    distData.reduce(lambda a, b: a + b)

    说明:

    Spark将为群集的每个分区(partition)运行一个任务(task)。

    通常,可以根据CPU核心数量指定分区数量(每个CPU有2-4个分区)

    如未指定分区数量,Spark会自动设置分区数。

通过外部数据创建RDD

​ PySpark可以从Hadoop支持的任何存储源创建RDD,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。

eg

将words.txt放入HDFS中

1
hadoop fs -put words.txt /tmp/

访问并查看数据

1
2
rdd1 = sc.textFile('/tmp/words.txt')
rdd1.collect()

RDD的常用操作

  • RDD 支持两种类型的操作transformation和action:
    • transformation(创建)
      • 从一个已经存在的数据集创建一个新的数据集,eg:map
        • rddA ——->transformation ——> rddB
    • action(获取结果)
      • 获取对数据进行运算操作之后的结果,eg:reduce
  • 所有的transformation操作都是惰性的(lazy)
    • 不会立即计算结果,只有调用action一类的操作之后才会计算所有transformation。
    • 这种设计使Spark运行效率更高
    • 例如map reduce 操作,map创建的数据集将用于reduce,map阶段的结果不会返回,仅会返回reduce结果。

RDD Transformation算子

  • map: map(func)

    • 将func函数作用到数据集的每一个元素上,生成一个新的RDD返回

      1
      2
      3
      4
      >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
      >>> rdd2 = rdd1.map(lambda x: x+1)
      >>> rdd2.collect()
      [2, 3, 4, 5, 6, 7, 8, 9, 10]
      1
      2
      3
      4
      5
      6
      7
      >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
      >>> def add(x):
      ... return x+1
      ...
      >>> rdd2 = rdd1.map(add)
      >>> rdd2.collect()
      [2, 3, 4, 5, 6, 7, 8, 9, 10]
  • filter

    • filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回

      1
      2
      3
      4
      5
      >>> rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9],3)
      >>> rdd2 = rdd1.map(lambda x:x*2)
      >>> rdd3 = rdd2.filter(lambda x:x>4)
      >>> rdd3.collect()
      [6, 8, 10, 12, 14, 16, 18]
  • flatmap

    • flatMap会先执行map的操作,再将所有对象合并为一个对象

      1
      2
      3
      4
      >>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
      >>> rdd2 = rdd1.flatMap(lambda x:x.split(" "))
      >>> rdd2.collect()
      ['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']

      flatMap和map的区别:flatMap在map的基础上将结果合并到一个list中

      1
      2
      3
      4
      >>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
      >>> rdd2 = rdd1.map(lambda x:x.split(" "))
      >>> rdd2.collect()
      [['a', 'b', 'c'], ['d', 'e', 'f'], ['h', 'i', 'j']]
  • intersection

    • 对两个RDD求交集
    1
    2
    3
    4
    5
    6
    >>> rdd1 = sc.parallelize([("a",1),("b",2)])
    >>> rdd2 = sc.parallelize([("c",1),("b",3)])
    >>> rdd3 = rdd1.union(rdd2)
    >>> rdd4 = rdd3.intersection(rdd2)
    >>> rdd4.collect()
    [('c', 1), ('b', 3)]
  • groupByKey

    • 以元组中的第0个元素作为key,进行分组,返回一个新的RDD

      1
      2
      3
      4
      5
      6
      >>> rdd1 = sc.parallelize([("a",1),("b",2)])
      >>> rdd2 = sc.parallelize([("c",1),("b",3)])
      >>> rdd3 = rdd1.union(rdd2)
      >>> rdd4 = rdd3.groupByKey()
      >>> rdd4.collect()
      [('a', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5898>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5518>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5f28>)]
    • groupByKey之后的结果中 value是一个Iterable

      1
      2
      3
      4
      5
      6
      >>> result[2]
      ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>)
      >>> result[2][1]
      <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>
      >>> list(result[2][1])
      [2, 3]
  • reduceByKey

    • 将key相同的键值对,按照Function进行计算
    1
    2
    3
    >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
    >>> rdd.reduceByKey(lambda x,y:x+y).collect()
    [('b', 1), ('a', 2)]
  • sortByKey

    • sortByKey(ascending=True, numPartitions=None, keyfunc=>)

      Sorts this RDD, which is assumed to consist of (key, value) pairs.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
    >>> sc.parallelize(tmp).sortByKey().first()
    ('1', 3)
    >>> sc.parallelize(tmp).sortByKey(True, 1).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
    [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
    >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
    >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
    >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
    [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]

    RDD Action算子

    • collect

      • 返回一个list,list中包含 RDD中的所有元素
      • 只有当数据量较小的时候使用Collect 因为所有的结果都会加载到内存中
    • reduce

      • reduceRDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
      1
      2
      3
      >>> rdd1 = sc.parallelize([1,2,3,4,5])
      >>> rdd1.reduce(lambda x,y : x+y)
      15
    • first

      • 返回RDD的第一个元素
      1
      2
      >>> sc.parallelize([2, 3, 4]).first()
      2
    • take

      • 返回RDD的前N个元素
      • take(num)
      1
      2
      3
      4
      5
      6
      >>> sc.parallelize([2, 3, 4, 5, 6]).take(2)
      [2, 3]
      >>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
      [2, 3, 4, 5, 6]
      >>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
      [91, 92, 93]
    • count

      返回RDD中元素的个数

      1
      2
      >>> sc.parallelize([2, 3, 4]).count()
      3
觉得不错?