【SparkStreaming学习之二】 SparkStreaming算子操作
环境
虚拟机:VMware 10
Linux版本:CentOS-6.5-x86_64
客户端:Xshell4
FTP:Xftp4
jdk1.8
scala-2.10.4(依赖jdk1.8)
spark-1.6
一、output operation算子
1、foreachRDD:必须对抽取出来的RDD执行action类算子,代码才能执行。
(1.1)foreachRDD可以拿到DStream中的RDD
(1.2)foreachRDD call方法内,拿到的RDD的算子外的代码在Driver端执行。可以做到动态改变广播变量
package com.wjy.ss; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class SparkStreamingTest { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingTest"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN");//设置日志级别 不打印一堆无用的日志 //创建JavaStreamingContext 批次间隔为5秒 JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(5)); //监听134.32.123.101 9999端口 获取文本socket流 JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("134.32.123.101", 9999); //接下来进行wordcount JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } }); JavaPairDStream<String, Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word,1); } }); JavaPairDStream<String, Integer> reduceByKey = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); //print()可以加参数 表示多少时间打印一次 上面的transform只有调到print才会执行 //默认打印在此 DStream 中生成的每个 RDD 的前十个元素。加参数前num个元素 //reduceByKey.print(); /** * foreachRDD 可以拿到DStream中的RDD ,对拿到的RDD可以使用RDD的transformation类算子转换,要对拿到的RDD使用action算子触发执行,否则,foreachRDD不会执行。 * foreachRDD 中call方法内,拿到的RDD的算子外,代码是在Driver端执行。可以使用这个算子实现动态改变广播变量。 * */ reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaPairRDD<String, Integer> rdd) throws Exception { //Driver端执行 System.out.println("Driver ......."); //获取SparkContext SparkContext context = rdd.context(); //创建JavaSparkContext JavaSparkContext javaSparkContext = new JavaSparkContext(context); //广播变量 这里可以读取一个文件 文件内容可变 就达到了动态改变广播变量的目的 Broadcast<String> broadcast = javaSparkContext.broadcast("hello wjy"); String value = broadcast.value(); System.out.println(value); JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String,Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception { System.out.println("Executor ......."); return new Tuple2<String, Integer>(tuple._1+"~",tuple._2); } }); mapToPair.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> arg0) throws Exception { System.out.println(arg0); } }); } }); /* * * 执行结果: Driver ....... hello wjy Driver ....... hello wjy 19/04/22 17:26:46 WARN BlockManager: Block input-0-1555925206600 replicated to only 0 peer(s) instead of 1 peers 19/04/22 17:26:47 WARN BlockManager: Block input-0-1555925206800 replicated to only 0 peer(s) instead of 1 peers 19/04/22 17:26:47 WARN BlockManager: Block input-0-1555925207200 replicated to only 0 peer(s) instead of 1 peers Driver ....... hello wjy Executor ....... (ee~,1) Executor ....... (aa~,1) Executor ....... (ll~,1) Executor ....... (gg~,1) Executor ....... (dd~,1) Executor ....... (hh~,1) Executor ....... (wjy~,1) Executor ....... (kk~,1) Executor ....... (jj~,1) Executor ....... (ii~,1) Executor ....... (hello~,31) Executor ....... (tt~,1) Executor ....... (ff~,1) Executor ....... (bb~,1) Executor ....... (world~,17) Executor ....... (cc~,1) Driver ....... hello wjy */ //启动 jsc.start(); //监控:等待中断 jsc.awaitTermination(); //这里stop不会执行 我们在实际应用中根据触发条件执行 比如监控某个文件删除后就执行停止动作 jsc.stop(); } }
2、print
3、saveAsTextFiles
4、saveAsObjectFiles
5、saveAsHadoopFiles
二、transformation类算子
1、transform
(1.1)对Dstream做RDD到RDD的任意操作。
(1.2)transform call方法内,在拿到的RDD的Transformation类算子外 代码是在Driver端执行的,可以使用transform做到动态改变广播变量
代码示例:
2、updateStateByKey
(1)为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
(2)通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
(3)使用到updateStateByKey要开启checkpoint机制和功能,两种方式:
(3.1)SparkContext.setCheckpointDir..
(3.2)StreamingContext.checkpoint(....)
(4)多久向checkpoint中维护状态
(4.1)如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。
(4.2)如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。
代码示例:
3、reduceByKeyAndWindow
(3.1)窗口操作
窗口操作理解图一:
(3.1.1)每个滑动间隔 计算窗口长度内批次组成的DStream;
(3.1.2)窗口长度:window length;
(3.1.3)滑动间隔:sliding interval;
(3.1.4)普通机制不用设置checkpoint,优化机制需要设置checkpoint;
窗口操作理解图二:
假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。
窗口长度和滑动间隔必须是batchInterval的整数倍,如果不是整数倍会检测报错。
(3.2)优化后的window窗口操作示意图:
优化后的window操作要保存状态所以要设置checkpoint路径,没有优化的window操作可以不设置checkpoint路径。
示例代码:
参考:
Spark
