spark overview
UC Berkeley 的spark数据分析栈
按使用方式划分
- 离线批处理(Mlib,Graphs)
- 交互式查询(spark SQL)
- 时实计算(spark streaming)
spark资源调度
- stanalone
- mesos
- yarn
其中我们使用的是yarn资源调度,也就是运行spark job向集群申请资源的方式与hadoop是一样的,先向resourcemanger,然后在nodemanager,申请container启动applicationMaster,运行excutor
yarn的提交job方式client和cluster
- client提交方式,driver program运行在提交机器上
- cluster方式,driver program是运行在集群中的某个worker中
spark VS hadoop
应用场景
- hadoop的mapreduce适合做大数据集的离线批处理,
- hadoop不是万能的,小数据集(单机能处理的小数据集杀鸡用牛刀),以及复杂的迭代运算,实时计算,在线分析等无能为力,而spark的出现很好的弥补了hadoop的不足之处,因为spark是基于内存的计算框架,适合复杂的迭代计算,spark streaming弥补实时计算的空缺(storm实时性更高,吞吐量,容错方面缺不如spark,稍后介绍spark的容错机制lineage和实时计算与storm的对比)
运行效率
- spark官网效率比较
- 咱门研究中心同事实际的测试报告
- spark官网效率比较
1 | Spark性性能能与与MR相相比比较较提提高高了了13.6% |
- 开发效率比较
- spark基于rdd的操作,是mapreduce的超集,提供我们基于rdd丰富的接口,如filter,disinct,reducebykey等等,而hadoop这些操作需要用户在map或reduce,combine自己编码实现,
- 咱门写mapreduce程序,每个job都要写maper类,reducer类(当然有些job可以不写reducer类,如sqoop导入数据库就只需maper),可能还要写partition,combiner类,而且写完job后,需要构建job与job之间执行的顺序和依赖关系,输入输出的键值类型等;
- 而spark是不需要这么琐碎,对rdd执行多个transform后,当执行一个action动作后(后面将介绍rdd的操作),自动构建一个基于rdd的DAG有向无环执行作业图,使用过pig的同事有所体会,这点类似pig,pig的解释器会将基于数据集的流处理过程,转换为DAG的job链,但spark又优于pig,可以做到过程控制,pig作为一个数据流语言,缺乏过程控制,粗糙的过程控制需要一门动态的脚本语言如python,javascript来实现,而且pig,hive只适合做统计分析作业,面对复杂的处理,如dougelas参数区线的压缩,需要用mapreduce或spark处理。
开发语言支持
- 原生语言scala
- java
- python
- spark1.4后支持R语言
spark的核心RDD
大家可以理解弹性分布式集合就是一个数据集合,这个集合有多个partition组成,而这些partition分布到集群中各节点的worker
创建RDD的方式
基于内存集合
如1到100数字Range作为rdd,val data = sc.parallelize(1 to 100)外部存储系统,如hbase,cassandra,hdfs等, 如val data = sc.textfile(“dataPath”)
基于rdd的操作
transform是lazy执行的,也就是说直到遇到该rdd链执行action操作,才会启动job,执行计算,这种思想跟scala语言的lazy十分相似,下面通过一个简单的scala例子体会下这种思想
1 |
|
广播变量
广播变量是分发到每个worker的只读变量不能修改,功能与hadoop的分布式缓存类似,
目前的dns项目实战使用到是做资源表关联(大数据集与小数据集的关联),存放广播变量中,通过map转换操作做关联,注意广播变量是一个只读变量,不能做修改。
计数器
作业中全局的一个计数器,与hadoop的计数器类似,并不陌生,我们平时跑完mr或者pig的时候会有三种类型计数器的统计,
Framkework计数器,job计数器,hdfs文件系统计数器,注意spark中的计数器是不能在task中求值,只能在driver program中求值
在dns项目中统计各用户群,各运营商,top10的icp,每个icp下统计top10 的host,可先在每个partition中统计top10的icp和top10的host,然后保存到计数器变量中,然后将聚合后结果话单过滤只保留掉计数器中的host和icp,这样可以避免多次迭代调用rdd.top(10)产生NN个job;取五分钟小片数据,采用nn迭代调用rdd.top方式生成库表需要两个小时,并产生了1800多个小job,跑了两个多小时,采用计数器过滤方式,4分多钟就能跑完库表实现入库postgresql
rdd依赖
- narrow依赖(父rdd的同一个partion最多只给子rdd一个partion依赖)
- wide依赖(父rdd的同一个partion被子rdd多个partion依赖)
小结,从计算,存储,容错谈谈rdd
- 计算
注意:由于时间关系,直接截了他人画的图,deamon中存在一点error,正确的代码应该是map(parts=>(parts(0),parts(1).toInt)),第一次map的transform得到的是RDD[Array[String]],不是RDD[List[String]]
code.png)
每个job划分不同的stage,每个stage就是一个Set[task]集合
spark的作业调度,分DAGshedule,和taskshedule二级,跟hadoop的jobtraker,tasktracker两级调度类似
存储
MEMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2, MEMORY_AND_DISK_2
上面是spark是rdd的各种存储策略,是spark计算框架中,默认认为重复计算rdd需要的时间会比从磁盘中读取数据进行的io操作效率高,
因此默认所有的rdd的persist方式都是存在内存中,当内存不足后,会丢弃掉这个rdd,需要时候再根据lineage
机制从新计算,实际开发中那如果认为计算出来的rdd代价远比进行io大,这时可根据情况选择其他持久化策略,如在dns项目中,需要关联ppp的result和record话单后的rdd,采取MEMORY_AND_DISK_SER方式的持久化
- 容错(lineage)
穿插一个小故事:
1 |
|
分析情景:
- rdd就好比传家宝
- 情景中的每个人物就好比不同时候集群中的计算节点中的worker
- 小明变卖宝物,就好比执行了一个action,触发提交job
- 而每代人对宝物加入一个宝石,就好比rdd的transform操作
- rdd的容错是lineage机制,如果当向spark提交job的时候,会构造基于rdd操作的DAG的作业流,这时会有基于rdd依赖链,如果计算过程中某个rdd丢失了,它会从父rdd那重新计算,如果父rdd不存在,会一直回溯上去直到找到父的rdd,然后再依照依赖链重新执行计算,最后执行action操作
spark在项目的实战应用
架构图
项目代码
spark streaming
spark streaming vs Storm(下面是引用研究中心同事的给出的两者对比的报告内容)
1 | Storm和Spark Streaming都是分布式流处理的开源框架。虽然二者功能类似,但是也有着一定的区别。 |
核心DStream
- Dstream简介
Dstream是一组以时间为轴连续的一组rdd - Dstream的输入源
- DStream的transformations操作
- DSstream的action操作
使用场景划分
- 无状态
每次批处理,receiver接收的数据都作为数据Dstream操作
- 有状态updateStateByKey(func)
本次计算,需要用到上次批处理的结果。
比如spark streaming的批处理时间是五分钟,但业务中,我需要统计话单中haohandata.com.cn从程序运行后,每五分钟后haohandata.com.cn这个域名的累加的访问数,这时我们会以上次批处理为key的访问次数,加上本次五分钟批处理得到结果
- windowns
基于窗口的操作,批处理时间,滑动窗口,窗口大小
DNS实时计算实验项目中,统计五分钟粒度各rcode的次分布,
由于存在边界数据,解决的办法采取五分钟为批处理时间,滑动窗口为五分钟,窗口大小为10分钟,每次进行reduceByKeyAndWindow后,会进行过滤,只存这个windown中的中间五分钟数据,再入库cassandra