Spark操作数据,数据是以什么样的形式存在的?Spark是如何操作数据的呢?
RDD引入
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
- Dataset:一个数据集,简单的理解为集合,用于存放数据的
- Distributed:它的数据是分布式存储,并且可以做分布式的计算
- Resilient:弹性的
- 它表示的是数据可以保存在磁盘,也可以保存在内存中
- 数据分布式也是弹性的
- 弹性:并不是指他可以动态扩展,而是容错机制。
- RDD会在多个节点上存储,就和hdfs的分布式道理是一样的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition可能在不同的节点上
- spark读取hdfs的场景下,spark把hdfs的block读到内存就会抽象为spark的partition。
- spark计算结束,一般会把数据做持久化到hive,hbase,hdfs等等。我们就拿hdfs举例,将RDD持久化到hdfs上,RDD的每个partition就会存成一个文件,如果文件小于128M,就可以理解为一个partition对应hdfs的一个block。反之,如果大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。
- 不可变
- 为了做并行计算,且保证对临界资源访问的数据安全,将数据集做成不可变是比较好的实现方式。
- 同时,不可变可以保证数据计算出问题时,从上一个节点开始重新计算,不必从出发点重新计算。如 rdd0 -> rdd1 -> rdd2, 当 rdd1->rdd2,计算异常时,只需要重新计算rdd1->rdd2该环节即可。
- 可分区
- 将RDD数据切分成多个partition容错存储。
- 并行计算
RDD的创建
第一步 创建sparkContext
SparkContext是Spark程序的入口,代表了和Spark集群的链接。在Spark集群中通过SparkContext来创建RDD。
在创建sparkContext需要一个SparkConf, 用来保存Spark应用的连接信息。
1 | conf = SparkConf().setAppName(appName).setMaster(master) |
parrellelize方式
进入pyspark环境,可以在spark UI中看到当前的Spark作业 在浏览器访问当前centos的4040端口
可以在spark UI中看到当前的Spark作业 在浏览器访问当前的4040端口。
实例如下:
同时,也可以访问spark UI,查看运行情况。
1
2
3data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data,5)
distData.reduce(lambda a, b: a + b)说明:
Spark将为群集的每个分区(partition)运行一个任务(task)。
通常,可以根据CPU核心数量指定分区数量(每个CPU有2-4个分区)
如未指定分区数量,Spark会自动设置分区数。
通过外部数据创建RDD
PySpark可以从Hadoop支持的任何存储源创建RDD,包括本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。
eg
将words.txt放入HDFS中
1 | hadoop fs -put words.txt /tmp/ |
访问并查看数据
1 | rdd1 = sc.textFile('/tmp/words.txt') |
RDD的常用操作
- RDD 支持两种类型的操作transformation和action:
- transformation(创建)
- 从一个已经存在的数据集创建一个新的数据集,eg:map
- rddA ——->transformation ——> rddB
- 从一个已经存在的数据集创建一个新的数据集,eg:map
- action(获取结果)
- 获取对数据进行运算操作之后的结果,eg:reduce
- transformation(创建)
- 所有的transformation操作都是惰性的(lazy)
- 不会立即计算结果,只有调用action一类的操作之后才会计算所有transformation。
- 这种设计使Spark运行效率更高
- 例如map reduce 操作,map创建的数据集将用于reduce,map阶段的结果不会返回,仅会返回reduce结果。
RDD Transformation算子
map: map(func)
将func函数作用到数据集的每一个元素上,生成一个新的RDD返回
1
2
3
41,2,3,4,5,6,7,8,9],3) rdd1 = sc.parallelize([
lambda x: x+1) rdd2 = rdd1.map(
rdd2.collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]1
2
3
4
5
6
71,2,3,4,5,6,7,8,9],3) rdd1 = sc.parallelize([
def add(x):
return x+1
...
rdd2 = rdd1.map(add)
rdd2.collect()
[2, 3, 4, 5, 6, 7, 8, 9, 10]
filter
filter(func) 选出所有func返回值为true的元素,生成一个新的RDD返回
1
2
3
4
51,2,3,4,5,6,7,8,9],3) rdd1 = sc.parallelize([
lambda x:x*2) rdd2 = rdd1.map(
lambda x:x>4) rdd3 = rdd2.filter(
rdd3.collect()
[6, 8, 10, 12, 14, 16, 18]
flatmap
flatMap会先执行map的操作,再将所有对象合并为一个对象
1
2
3
4"a b c","d e f","h i j"]) rdd1 = sc.parallelize([
lambda x:x.split(" ")) rdd2 = rdd1.flatMap(
rdd2.collect()
['a', 'b', 'c', 'd', 'e', 'f', 'h', 'i', 'j']flatMap和map的区别:flatMap在map的基础上将结果合并到一个list中
1
2
3
4>>> rdd1 = sc.parallelize(["a b c","d e f","h i j"])
>>> rdd2 = rdd1.map(lambda x:x.split(" "))
>>> rdd2.collect()
[['a', 'b', 'c'], ['d', 'e', 'f'], ['h', 'i', 'j']]
intersection
- 对两个RDD求交集
1
2
3
4
5
6"a",1),("b",2)]) rdd1 = sc.parallelize([(
"c",1),("b",3)]) rdd2 = sc.parallelize([(
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.intersection(rdd2)
rdd4.collect()
[('c', 1), ('b', 3)]groupByKey
以元组中的第0个元素作为key,进行分组,返回一个新的RDD
1
2
3
4
5
6"a",1),("b",2)]) rdd1 = sc.parallelize([(
"c",1),("b",3)]) rdd2 = sc.parallelize([(
rdd3 = rdd1.union(rdd2)
rdd4 = rdd3.groupByKey()
rdd4.collect()
[('a', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5898>), ('c', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5518>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6a5e5f28>)]groupByKey之后的结果中 value是一个Iterable
1
2
3
4
5
62] result[
('b', <pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>)
2][1] result[
<pyspark.resultiterable.ResultIterable object at 0x7fba6c18e518>
2][1]) list(result[
[2, 3]
reduceByKey
- 将key相同的键值对,按照Function进行计算
1
2
3"a", 1), ("b", 1), ("a", 1)]) rdd = sc.parallelize([(
lambda x,y:x+y).collect() rdd.reduceByKey(
[('b', 1), ('a', 2)]sortByKey
sortByKey
(ascending=True, numPartitions=None, keyfunc=> )Sorts this RDD, which is assumed to consist of (key, value) pairs.
1
2
3
4
5
6
7
8
9
10
11'a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] tmp = [(
sc.parallelize(tmp).sortByKey().first()
('1', 3)
True, 1).collect() sc.parallelize(tmp).sortByKey(
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
True, 2).collect() sc.parallelize(tmp).sortByKey(
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
'Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] tmp2 = [(
'whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) tmp2.extend([(
True, 3, keyfunc=lambda k: k.lower()).collect() sc.parallelize(tmp2).sortByKey(
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]RDD Action算子
collect
- 返回一个list,list中包含 RDD中的所有元素
- 只有当数据量较小的时候使用Collect 因为所有的结果都会加载到内存中
reduce
- reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。
1
2
3>> rdd1 = sc.parallelize([1,2,3,4,5])
>> rdd1.reduce(lambda x,y : x+y)
15first
- 返回RDD的第一个元素
1
22, 3, 4]).first() sc.parallelize([
2take
- 返回RDD的前N个元素
take
(num)
1
2
3
4
5
6>> sc.parallelize([2, 3, 4, 5, 6]).take(2)
[2, 3]
>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
[2, 3, 4, 5, 6]
>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
[91, 92, 93]count
返回RDD中元素的个数
1
2>>> sc.parallelize([2, 3, 4]).count()
3