1.RDD中的函数传递

自己定义一些RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

传递一个方法

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。
class Search(query: String){ // extends Serializable
  //过滤出包含字符串的数据
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  //过滤出包含字符串的RDD
  def getMatch1(rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)  
  }
  //过滤出包含字符串的RDD
  def getMatch2(rdd: RDD[String]): RDD[String] = {
    val str: String = this.query   //将类变量赋值给局部变量str,即可序列化;
    rdd.filter(x => x.contains(str))

  }
}

 

object TestSearch {
  def main(args: Array[String]): Unit = {
    //初始化配置信息以及 sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd = sc.makeRDD(List("kris", "Baidu", "Google")) //创建一个RDD
    val search = new Search("ris")  //创建一个search对象
    println("===============")
    //运用第一个过滤函数并打印结果;
    val res: RDD[String] = search.getMatch1(rdd) //java.io.NotSerializableException: com.atguigu.spark.Search
    //class Search(query: String) extends Serializable
    res.foreach(println(_))
  }
}

//过滤出包含字符串的RDD

  def getMatch1 (rdd: RDD[String]): RDD[String] = {

    rdd.filter(isMatch)

  }

在这个方法中所调用的方法isMatch()是定义在Search这个类中的,实际上调用的是this. isMatch(),this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

解决方案 使类继承scala.Serializable即可。 class Search() extends Serializable{...}

传递一个属性

//初始化sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd = sc.makeRDD(List("kris", "Baidu", "Google"))
    val search = new Search("ris")
    println("===============")

     val res2: RDD[String] = search.getMatch2(rdd)
     res2.foreach(println(_))

    rdd.filter(x => x.contains(query))

在这个方法中所调用的方法query是定义在Search这个类中的字段,实际上调用的是this. query,this表示Search这个类的对象,程序在运行过程中需要将Search对象序列化以后传递到Executor端。

解决方案:将类变量query赋值给局部变量如上所示;

2. RDD依赖关系

 Lineage

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

scala> val x = sc.textFile("./wc.txt").flatMap(_.split(" ")).map((_,1))
x: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[104] at map at <console>:24
scala> x.toDebugString
res112: String =
(2) MapPartitionsRDD[104] at map at <console>:24 []  ## new MapPartitionsRDD(
 |  MapPartitionsRDD[103] at flatMap at <console>:24 []  ##new MapPartitionsRDD
 |  ./wc.txt MapPartitionsRDD[102] at textFile at <console>:24 []
 |  ./wc.txt HadoopRDD[101] at textFile at <console>:24 []
 
scala> x.dependencies ##可以看到它的依赖OneToOneDependency窄依赖
res113: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1af408a8)

 

scala> val y = x.reduceByKey(_+_)
y: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[105] at reduceByKey at <console>:26

scala> y.toDebugString
res114: String =
(2) ShuffledRDD[105] at reduceByKey at <console>:26 []
 +-(2) MapPartitionsRDD[104] at map at <console>:24 []
    |  MapPartitionsRDD[103] at flatMap at <console>:24 []
    |  ./wc.txt MapPartitionsRDD[102] at textFile at <console>:24 []
    |  ./wc.txt HadoopRDD[101] at textFile at <console>:24 []

scala> y.dependencies
res115: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@5b7e9456) ##宽依赖,产生shuffle

 跨节点传输数据就产生shuffle

DAG

DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。

3. 任务划分(重点)

RDD任务切分中间分为:Application、Job、Stage和Task

1)Application:初始化一个SparkContext即生成一个Application; 提交一个jar包就是Application(一个Application可以 有多个job)

2)Job:一个Action算子就会生成一个Job 

3)Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。

4)Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

有多少个task,由你当前stage的最后一个RDD的分区数决定

窄依赖分两种:OneToOneDependency和 RangeDependency(如两个分区union两个分区 => 四个分区)   NarrowDependency窄依赖

Union会产生窄依赖(查看源码);map也是窄依赖;  ReduceByKey是宽依赖,shuffledRDD---shufleDependency

scala> sc.makeRDD(1 to 8).map((_,1)).reduceByKey(_+_).collect
res3: Array[(Int, Int)] = Array((4,1), (6,1), (8,1), (2,1), (1,1), (7,1), (3,1), (5,1))

在宽依赖算子reduceByKey那切一刀;

SparkCore 2 随笔 第1张

scala> sc.makeRDD(1 to 8).map((_,1)).reduceByKey(_+_).map((_,1)).reduceByKey(_+_).collect
res4: Array[((Int, Int), Int)] = Array(((6,1),1), ((3,1),1), ((8,1),1), ((2,1),1), ((5,1),1), ((1,1),1), ((7,1),1), ((4,1),1))

两个reduceByKey宽依赖,分成了3个stage;

SparkCore 2 随笔 第2张

RDD分区数对应Task数;

scala> sc.makeRDD(1 to 8,4).map((_,1)).coalesce(3,false).reduceByKey(_+_).coalesce(2,false).collect
res5: Array[(Int, Int)] = Array((6,1), (3,1), (4,1), (1,1), (7,1), (8,1), (5,1), (2,1))
  4个分区==>map算子 4个分区 ==> 3个分区 reduceByKey宽依赖算子切分了两个stage coalesce产生 2个分区

可以推断得到产生了1个Application,2个stage,第一个stage产生了3个task;第二个stage产生了2个task;

SparkCore 2 随笔 第3张

3个task

SparkCore 2 随笔 第4张

2个task

SparkCore 2 随笔 第5张

 

4. 键值对RDD数据分区

Spark目前支持Hash分区和Range分区,用户也可以自定义分区,Hash分区为当前的默认分区,Spark中分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle过程属于哪个分区和Reduce的个数

注意:

(1)只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

获取RDD分区

可以通过使用RDD的partitioner 属性来获取 RDD 的分区方式。它会返回一个 scala.Option 对象, 通过get方法获取其中的值。

Hash分区

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

Ranger分区

HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。

RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。实现过程为:

第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;

第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

 自定义分区

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

(1)numPartitions: Int:返回创建出来的分区数。

(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。

需求:将相同后缀的数据写入相同的文件,通过将相同后缀的数据分区到相同的分区并保存输出来实现

5. 数据读取与保存

      Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;

文件系统分为:本地文件系统、HDFS、HBASE以及数据库。

文件类数据读取与保存

Text文件

1)数据读取:textFile(String)

scala> val hdfsFile = sc.textFile("hdfs://hadoop101:9000/fruit.txt")

hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24

2)数据保存: saveAsTextFile(String)

scala> hdfsFile.saveAsTextFile("/fruitOut")

Json文件

如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。

注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON

scala> sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.json").collect
res13: Array[String] = Array({"name":"Michael"}, {"name":"Andy", "age":30}, {"name":"Justin", "age":19})
scala> val y = sc.textFile("/opt/module/spark/spark-local/examples/src/main/resources/people.json")
y: org.apache.spark.rdd.RDD[String] = /opt/module/spark/spark-local/examples/src/main/resources/people.json MapPartitionsRDD[25] at textFile at <console>:25
scala> y.map(JSON.parseFull).collect
res19: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

Sequence文件

 SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以调用 sequenceFile[keyClass, valueClass](path)。

注意:SequenceFile文件只针对PairRDD

scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[28] at parallelize at <console>:25
scala> rdd.saveAsSequenceFile("./output/seque")
scala> sc.sequenceFile[Int,Int]("/opt/module/spark/spark-local/output/seque").collect
res22: Array[(Int, Int)] = Array((5,6), (1,2), (3,4))

对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调用saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。

scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at parallelize at <console>:25
scala> rdd.saveAsObjectFile("./output/object")
scala> sc.objectFile[(Int)]("/opt/module/spark/spark-local/output/object").collect
res25: Array[Int] = Array(4, 2, 3, 1)

 

文件系统类数据读取与保存

 1 HDFS

2 MySQL

支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,

从Mysql读取数据

    //初始化
    val conf = new SparkConf().setAppName("WorldCount").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //定义连接mysql的参数
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://hadoop101:3306/rdd"
    val userName = "root"
    val password = "123456"
    //读取 //创建JdbcRDD
    val rdd: JdbcRDD[(Int, String)] = new JdbcRDD(sc, () => {
      Class.forName(driver)
      DriverManager.getConnection(url, userName, password)},
      "select * from test where id >= ? and id <= ?;",
      1, 4, 2,
      r => (r.getInt(1), r.getString(2))
    )
    println(rdd.count())
    rdd.foreach(println(_))

    sc.stop()

从rdd写入mysql

   //rdd数据输出到mysql
    //写入数据,foreachPartition是每个分区创建一个连接
    val rdd: RDD[(Int, String)] = sc.makeRDD(List((5, "Amazon")))
    rdd.foreachPartition(x => {
      Class.forName(driver)
      val conn: Connection = DriverManager.getConnection(url, userName, password)
      x.foreach(x => {
        val id: Int = x._1
        val name: String = x._2
        val statement: PreparedStatement = conn.prepareStatement("insert into test (id, name) values(?, ?)")
        statement.setInt(1, id)
        statement.setString(2, name)
        statement.execute()
      })
    } )
    sc.stop()

从HBase读取数据

由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过Hadoop输入格式访问HBase。这个输入格式会返回键值对数据,其中键的类型为org. apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为org.apache.hadoop.hbase.client.

Result。

    //初始化sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    //从hbase表读取数据
    val configuration: Configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
    configuration.set(TableInputFormat.INPUT_TABLE, "fruit")

    val rdd = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    rdd.foreach(x => {
      val cells: Array[Cell] = x._2.rawCells()
      cells.foreach(cell => {
        val rowkey: String = Bytes.toString(CellUtil.cloneRow(cell))
        val family: String = Bytes.toString(CellUtil.cloneFamily(cell))
        val column: String = Bytes.toString(CellUtil.cloneQualifier(cell))
        val value: String = Bytes.toString(CellUtil.cloneValue(cell))
        println(s"$rowkey  $family  $column  $value")
      })
    })
    sc.stop()

往HBase写入

  //初始化sc
    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //rdd数据写入到hbase表
    val rdd: RDD[(String, String, String, String)] = sc.makeRDD(List(("1004", "info", "name", "pineapple")))
    val rdd2 = rdd.map(x => {
      val put: Put = new Put(Bytes.toBytes(x._1))
      put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._4))
      (new ImmutableBytesWritable(), put)
    })

    //创建配置
    val configuration: Configuration = HBaseConfiguration.create()
    configuration.set("hbase.zookeeper.quorum", "hadoop101,hadoop102,hadoop103")
    configuration.set(TableOutputFormat.OUTPUT_TABLE, "fruit")
    //设置OutputFormat类型
    val job: Job = Job.getInstance(configuration)
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])

    rdd2.saveAsNewAPIHadoopDataset(job.getConfiguration)

    sc.stop()

累加器

累加器用来对信息进行聚合,通常在向 Spark传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

自定义累加器

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。实现自定义类型累加器需要继承AccumulatorV2并覆写要求的方法。

class MyAcc1 extends AccumulatorV2[Int, Int]{
  private var init = 0

  //判断是否为空
  override def isZero: Boolean = init == 0
  //复制
  override def copy(): AccumulatorV2[Int, Int] = {
    val acc: MyAcc1 = new MyAcc1
    acc.init = this.init
    acc
  }

  override def reset(): Unit = { //重置
    init = 0
  }

  override def add(v: Int): Unit = { //累加
    init += v
  }

  override def merge(other: AccumulatorV2[Int, Int]): Unit = { //合并
    init += other.value
  }

  override def value: Int = init //返回值
}

 调用自定义累加器

object TestAcc {
  def main(args: Array[String]): Unit = {
    //初始化sc
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[Range.Inclusive] = sc.makeRDD(List((1 to 4))) //创建RDD
    val acc: MyAcc1 = new MyAcc1 //创建自定义累加器对象

    //注册累加器, 在Driver中sc
    sc.register(acc, "MyAcc1")

    rdd.foreach(x => { //在行动算子中对累加器的值进行修改
      acc.add(2)
      println(x) //Range(1, 2, 3, 4)
    })
    println(acc.value) //打印累加器的值 2
    sc.stop() //关闭SparkContext

  }
}

广播变量(调优策略)

* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.

广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,甚至是机器学习算法中的一个很大的特征向量,广播变量用起来都很顺手。 在多个并行操作中使用同一个变量,但是 Spark会为每个任务分别发送。

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(35)

scala> broadcastVar.value

res33: Array[Int] = Array(1, 2, 3)

使用广播变量的过程如下:

(1) 通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象。任何可序列化的类型都可以这么实现。

(2) 通过value属性访问该对象的值(在Java中为value()方法)。

(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

object TestBroadcast {
  def main(args: Array[String]): Unit = {
    //初始化sc
    val conf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")

    val rdd: RDD[String] = sc.makeRDD(List("kris", "alex", "smile"))
    val temp = "ris"
    sc.broadcast(temp)

    /**
      * 累加器和广播变量的区别:
      * 都是共享变量
      * 累加器只能写
      * 广播变量只能读
      */
    val result = rdd.filter(x => {
      x.contains(temp)
    })
    result.foreach(println(_)) //kris


  }
}

 

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄