1.Spark Streaming功能介绍

1)定义

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第1张

2.NC服务安装并运行Spark Streaming

1)在线安装nc命令

yum install -y nc

2)运行Spark Streaming 的WordCount

bin/run-example streaming.NetworkWordCount localhost 9999

3)把文件通过管道作为nc的输入,然后观察spark Streaming计算结果

cat test.txt | nc -lk 9999

文件具体内容

hadoop  storm   spark

hbase   spark   flume

spark   dajiangtai     spark

hdfs    mapreduce      spark

hive    hdfs    solr

spark   flink   storm

hbase   storm   es

3.Spark Streaming工作原理

1)Spark Streaming数据流处理

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第2张

2)接收器工作原理

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第3张

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第4张

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第5张

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第6张

3)综合工作原理

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第7张

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第8张

4.Spark Streaming编程模型

1)StreamingContext初始化的两种方式

#第一种

val ssc = new StreamingContext(sc, Seconds(5))

#第二种

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))

2)Spark Streaming socket代码

object NetworkWordCount {

  def main(args: Array[String]) {

    if (args.length < 2) {

      System.err.println("Usage: NetworkWordCount  ")

      System.exit(1)

    }

 

    //创建StreamingContext,每秒钟计算一次

    val sparkConf = new SparkConf().setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf, Seconds(1))

 

    //监听网络端口,参数一:hostname 参数二:port 参数三:存储级别,创建了lines流

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

    //flatMap运算

    val words = lines.flatMap(_.split(" "))

    //map reduce 计算

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

5.Spark Streaming读取Socket流数据

1)spark-shell运行Streaming程序,要么线程数大于1,要么基于集群。

bin/spark-shell --master local[2]

bin/spark-shell --master spark://bigdata-pro01.kfk.com:7077

2)spark 运行模式

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第9张

3)Spark Streaming读取Socket流数据

a)编写测试代码,并本地运行

object TestStreaming {

  def main(args: Array[String]) {

    if (args.length < 2) {

      System.err.println("Usage: NetworkWordCount  ")

      System.exit(1)

    }

       

        val spark=SparkSession.builder().master("local[2]").setAppName("streaming").getOrCreate()

        val sc = spark.SparkContext

       

        val ssc = new StreamingContext(sc, Seconds(5))

 

    //监听网络端口,参数一:hostname 参数二:port 参数三:存储级别,创建了lines流

    val lines = ssc.socketTextStream("igdata-pro02.kfk.com", 9999, StorageLevel.MEMORY_AND_DISK_SER)

    //flatMap运算

    val words = lines.flatMap(_.split(" "))

    //map reduce 计算

    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()

    ssc.awaitTermination()

  }

}

b)启动nc服务发送数据

nc -lk 9999

6.Spark Streaming保存数据到外部系统

1)保存到mysql数据库

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第10张

2)保存到hdfs

新闻网大数据实时分析可视化系统项目——19、Spark Streaming实时数据分析 随笔 第11张

7.Spark Streaming与Kafka集成

1)Maven引入相关依赖:spark-streaming-kafka

2)编写测试代码并启动运行

object StreamingKafka8 {

 

  def main(args: Array[String]): Unit = {

 

    val spark  = SparkSession.builder()

      .master("local[2]")

      .appName("streaming").getOrCreate()

 

    val sc =spark.sparkContext;

    val ssc = new StreamingContext(sc, Seconds(5))

 

    // Create direct kafka stream with brokers and topics

    val topicsSet =Set("weblogs")

    val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata-pro01.kfk.com:9092")

    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

      ssc, kafkaParams, topicsSet)

 

    val lines = kafkaStream.map(x => x._2)

    val words = lines.flatMap(_.split(" "))

    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)

    wordCounts.print()

 

    ssc.start()

    ssc.awaitTermination()

  }

}

3)启动Kafka服务并测试生成数据

bin/kafka-server-start.sh config/server.properties

bin/kafka-console-producer.sh --broker-list bigdata-pro01.kfk.com --topic weblogs

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄