环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk1.8
  scala-2.10.4(依赖jdk1.8)
  spark-1.6

一、output operation算子
1、foreachRDD:必须对抽取出来的RDD执行action类算子,代码才能执行。
  (1.1)foreachRDD可以拿到DStream中的RDD
  (1.2)foreachRDD call方法内,拿到的RDD的算子外的代码在Driver端执行。可以做到动态改变广播变量

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
package com.wjy.ss;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class SparkStreamingTest {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingTest");
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");//设置日志级别  不打印一堆无用的日志
        
        //创建JavaStreamingContext  批次间隔为5秒
        JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(5));
        //监听134.32.123.101 9999端口 获取文本socket流
        JavaReceiverInputDStream<String> socketTextStream = jsc.socketTextStream("134.32.123.101", 9999);
        //接下来进行wordcount
        JavaDStream<String> words = socketTextStream.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        JavaPairDStream<String, Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word,1);
            }
        });
        JavaPairDStream<String, Integer> reduceByKey = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        
        //print()可以加参数  表示多少时间打印一次   上面的transform只有调到print才会执行
        //默认打印在此 DStream 中生成的每个 RDD 的前十个元素。加参数前num个元素
        //reduceByKey.print();
        
        /**
         * foreachRDD 可以拿到DStream中的RDD ,对拿到的RDD可以使用RDD的transformation类算子转换,要对拿到的RDD使用action算子触发执行,否则,foreachRDD不会执行。
         *  foreachRDD 中call方法内,拿到的RDD的算子外,代码是在Driver端执行。可以使用这个算子实现动态改变广播变量。
         * 
         */
        reduceByKey.foreachRDD(new VoidFunction<JavaPairRDD<String,Integer>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(JavaPairRDD<String, Integer> rdd)
                    throws Exception 
            {
                //Driver端执行
                System.out.println("Driver .......");
                //获取SparkContext
                SparkContext context = rdd.context();
                //创建JavaSparkContext
                JavaSparkContext javaSparkContext = new JavaSparkContext(context);
                //广播变量  这里可以读取一个文件 文件内容可变  就达到了动态改变广播变量的目的
                Broadcast<String> broadcast = javaSparkContext.broadcast("hello wjy");
                String value = broadcast.value();
                System.out.println(value);
                JavaPairRDD<String, Integer> mapToPair = rdd.mapToPair(new PairFunction<Tuple2<String,Integer>, String,Integer>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> tuple) throws Exception {
                        System.out.println("Executor .......");
                        return new Tuple2<String, Integer>(tuple._1+"~",tuple._2);
                    }
                });
                mapToPair.foreach(new VoidFunction<Tuple2<String,Integer>>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void call(Tuple2<String, Integer> arg0)
                            throws Exception {
                        System.out.println(arg0);
                    }
                });
            }
        });
        
/*
* 
* 执行结果:
Driver .......
hello wjy
Driver .......
hello wjy
19/04/22 17:26:46 WARN BlockManager: Block input-0-1555925206600 replicated to only 0 peer(s) instead of 1 peers
19/04/22 17:26:47 WARN BlockManager: Block input-0-1555925206800 replicated to only 0 peer(s) instead of 1 peers
19/04/22 17:26:47 WARN BlockManager: Block input-0-1555925207200 replicated to only 0 peer(s) instead of 1 peers
Driver .......
hello wjy
Executor .......
(ee~,1)
Executor .......
(aa~,1)
Executor .......
(ll~,1)
Executor .......
(gg~,1)
Executor .......
(dd~,1)
Executor .......
(hh~,1)
Executor .......
(wjy~,1)
Executor .......
(kk~,1)
Executor .......
(jj~,1)
Executor .......
(ii~,1)
Executor .......
(hello~,31)
Executor .......
(tt~,1)
Executor .......
(ff~,1)
Executor .......
(bb~,1)
Executor .......
(world~,17)
Executor .......
(cc~,1)
Driver .......
hello wjy
*/
        
        //启动
        jsc.start();
        //监控:等待中断
        jsc.awaitTermination();
        //这里stop不会执行  我们在实际应用中根据触发条件执行 比如监控某个文件删除后就执行停止动作
        jsc.stop();
    }
}

 


2、print

 

3、saveAsTextFiles

4、saveAsObjectFiles

5、saveAsHadoopFiles

二、transformation类算子
1、transform
(1.1)对Dstream做RDD到RDD的任意操作。
(1.2)transform call方法内,在拿到的RDD的Transformation类算子外 代码是在Driver端执行的,可以使用transform做到动态改变广播变量

代码示例:

2、updateStateByKey
(1)为SparkStreaming中每一个Key维护一份state状态,state类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
(2)通过更新函数对该key的状态不断更新,对于每个新的batch而言,SparkStreaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新。
(3)使用到updateStateByKey要开启checkpoint机制和功能,两种方式:
  (3.1)SparkContext.setCheckpointDir..
  (3.2)StreamingContext.checkpoint(....)
(4)多久向checkpoint中维护状态
  (4.1)如果batchInterval设置的时间小于10秒,那么10秒写入磁盘一份。
  (4.2)如果batchInterval设置的时间大于10秒,那么就会batchInterval时间间隔写入磁盘一份。

代码示例:

3、reduceByKeyAndWindow
(3.1)窗口操作
窗口操作理解图一:

【SparkStreaming学习之二】 SparkStreaming算子操作 随笔 第1张

(3.1.1)每个滑动间隔 计算窗口长度内批次组成的DStream;
(3.1.2)窗口长度:window length;
(3.1.3)滑动间隔:sliding interval;
(3.1.4)普通机制不用设置checkpoint,优化机制需要设置checkpoint;

窗口操作理解图二:

【SparkStreaming学习之二】 SparkStreaming算子操作 随笔 第2张
假设每隔5s 1个batch,上图中窗口长度为15s,窗口滑动间隔10s。
窗口长度和滑动间隔必须是batchInterval的整数倍,如果不是整数倍会检测报错。

(3.2)优化后的window窗口操作示意图:

【SparkStreaming学习之二】 SparkStreaming算子操作 随笔 第3张
优化:只计算新加进来的批次(加)和出去的批次(减):

优化后的window操作要保存状态所以要设置checkpoint路径,没有优化的window操作可以不设置checkpoint路径。

 示例代码:

 


参考:
Spark

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄