环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依赖jdk1.8)
  spark-1.6

一、receiver模式

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

【SparkStreaming学习之三】 SparkStreaming和kafka整合 随笔 第1张
1、receiver模式理解
在SparkStreaming程序运行起来后,Executor中会有receiver tasks接收kafka推送过来的数据。数据会被持久化,默认级别为MEMORY_AND_DISK_SER_2,这个级别也可以修改。receiver task对接收过来的数据进行存储和备份,这个过程会有节点之间的数据传输。备份完成后去zookeeper中更新消费偏移量,然后向Driver中的receiver tracker汇报数据的位置。最后Driver根据数据本地化将task分发到不同节点上执行。

2、receiver模式中存在的问题
当Driver进程挂掉后,Driver下的Executor都会被杀掉,当更新完zookeeper消费偏移量的时候,Driver如果挂掉了,就会存在找不到数据的问题,相当于丢失数据。
如何解决这个问题?
开启WAL(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到HDFS上一份(我们需要将接收来的数据的持久化级别降级到MEMORY_AND_DISK),这样就能保证数据的安全性。不过,因为写HDFS比较消耗性能,要在备份完数据之后才能进行更新zookeeper以及汇报位置等,这样会增加job的执行时间,这样对于任务的执行提高了延迟度。

3、receiver模式代码

 

4、receiver的并行度设置
receiver的并行度是由spark.streaming.blockInterval来决定的,默认为200ms,假设batchInterval为5s,那么每隔blockInterval就会产生一个block,这里就对应每批次产生RDD的partition,这样5秒产生的这个Dstream中的这个RDD的partition为25个,并行度就是25。如果想提高并行度可以减少blockInterval的数值,但是最好不要低于50ms。

【SparkStreaming学习之三】 SparkStreaming和kafka整合 随笔 第2张
由于receiver模式存在的问题,目前这种模式在实际生产中用的较少。

二、Driect模式

  【SparkStreaming学习之三】 SparkStreaming和kafka整合 随笔 第3张

 

【SparkStreaming学习之三】 SparkStreaming和kafka整合 随笔 第4张

1、Direct模式理解
SparkStreaming+kafka 的Driect模式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。消费者偏移量也不是用zookeeper来管理,而是SparkStreaming内部对消费者偏移量自动来维护,默认消费偏移量是在内存中,当然如果设置了checkpoint目录,那么消费偏移量也会保存在checkpoint中。当然也可以实现用zookeeper来管理。

2、Direct模式并行度设置
Direct模式的并行度是由读取的kafka中topic的partition数决定的。

3、Direct模式代码

 

三、相关配置
1、反压机制:
spark.streaming.backpressure.enabled 默认false

2、blockInterval:
spark.streaming.blockInterval 默认200ms

3、接收数据速率:
spark.streaming.receiver.maxRate 默认没有设置

4、预写日志:
spark.streaming.receiver.writeAheadLog.enable 默认false没有开启

5、
spark.streaming.stopGracefullyOnShutdown

6、
spark.streaming.kafka.maxRatePerPartition

 

参考:
Spark

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄