Spark SQL 类似于Hive

一、Spark SQL 基础

1、什么是Spark SQL

Spark SQL is Apache Spark’s module for working with structured data.
Spark SQL 是spark 的一个模块。来处理 结构化 的数据
不能处理非结构化的数据

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

特点:
1)容易集成
不需要单独安装

2)统一的数据访问方式
结构化数据的类型:JDBC JSon Hive parquer文件 都可以作为Spark SQL 的数据源
对接多种数据源,且使用方式类似

3)完全兼容hive
把Hive中的数据,读取到Spark SQL中运行

4)支持标准的数据连接
JDBC

2、为什么学习Spark SQL

执行效率比Hive高

hive 2.x 执行引擎可以使用 Spark

3、核心概念:表(DataFrame DataSet)

mysql中的表:表结构、数据
DataFrame:Schema、RDD(数据)

DataSet 在spark1.6以后,对DataFrame做了一个封装

4、创建DataFrame

(*)测试数据:员工表、部门表
第一种方式:使用case class
1)定义Schema
样本类来定义Schema

case class 特点:
可以支持模式匹配,使用case class建立表结构

7521, WARD, SALESMAN,7698, 1981/2/22, 1250, 500, 30

case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)

2)读取文件
val lines = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))

3)把每行数据,映射到Emp上
val allEmp = lines.map(x => Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

4)生成DataFrame
val df1 = allEmp.toDF

df1.show

第二种方式 使用Spark Session
(1)什么是Spark Session
Spark session available as ‘spark’.
2.0以后引入的统一访问方式。可以访问所有的Spark组件

def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame

(2)使用StructType来创建Schema

val struct =
StructType(
StructField(“a”, IntegerType, true) ::
StructField(“b”, LongType, false) ::
StructField(“c”, BooleanType, false) :: Nil)

case class Emp(
empno:Int,
ename:String,
job:String,
mgr:Int,
hiredate:String,
sal:Int,
comm:Int,
deptno:Int)

—————–分割———————-
import org.apache.spark.sql.types._

val myschema = StructType(
List(
StructField(“empno”,DataTypes.IntegerType),
StructField(“ename”,DataTypes.StringType),
StructField(“job”,DataTypes.StringType),
StructField(“mgr”,DataTypes.IntegerType),
StructField(“hiredate”,DataTypes.StringType),
StructField(“sal”,DataTypes.IntegerType),
StructField(“comm”,DataTypes.IntegerType),
StructField(“deptno”,DataTypes.IntegerType)
))

准备数据 RDD[Row]
import org.apache.spark.sql.Row

val allEmp = lines.map(x => Row(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt))

val df2 = spark.createDataFrame(allEmp,myschema)

df2.show

第三种方式
直接读取一个带格式的文件
在/root/hd/spark-2.1.0-bin-hadoop2.7/examples/src/main/resources有现成的json代码

val df3 = spark.read 读文件,默认是Parquet文件
val df3 = spark.read.json(“/uroot/hd/tmp_files/people.json”)

df3.show

val df4 = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)

df4.show

5、操作DataFrame

1)DSL语句
mybatis Hibernate


  
   df1.printSchema 
  
  
    
  
  
   df1.
   select(
   "ename",
   "sal").
   show 
  
  
    
  
  
   df1.
   select($
   "ename",$
   "sal",$
   "sal"+
   100).
   show 
  

$”sal” 可以看做是一个变量

查询薪水大于2000的员工
df1.filter($”sal” > 2000).show

求每个部门的员工人数
df1.groupBy($”deptno”).count.show

相当于select deptno,count(1) from emp group by deptno

2)SQL语句
注意:不能直接执行SQL,需要生成一个视图,再执行sql

scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView

一般用到 createOrReplaceTempView createTempView
视图:类似于表,但不保存数据

df1.createOrReplaceTempView(“emp”)

操作:
spark.sql(“select * from emp”).show

查询薪水大于2000的员工
spark.sql(“select * from emp where sal > 2000”).show

求每个部门的员工人数
spark.sql(“select deptno,count(1) from emp group by deptno”).show

3)多表查询
10,ACCOUNTING,NEW YORK

case class Dept(deptno:Int,dname:String,loc:String)
val lines = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
val allDept = lines.map(x=>Dept(x(0).toInt,x(1),x(2)))

df5.createOrReplaceTempView(“dept”)

spark.sql(“select dname,ename from emp,dept where emp.deptno=dept.deptno”).show

6、操作DataSet

Dataset是一个分布式的数据收集器。这是在Spark1.6之后新加的一个接口,兼顾了RDD的优点(强类型,可以使用功能强大的lambda)以及Spark SQL的执行器高效性的优点。所以可以把DataFrames看成是一种特殊的Datasets,即:Dataset(Row)

Dataset跟DataFrame类似,是一套新的接口,是高级的Dataframe

举例:
1)创建DataSet
(1)使用序列来创建DataSet
定义一个case class
case class MyData(a:Int,b:String)

生成序列,并创建DataSet
val ds = Seq(MyData(1,”Tom”),MyData(2,”Merry”)).toDS

.toDS 生成DataSet

ds.show

(2)使用JSON数据来创建DataSet

定义case class
case class Person(name:String,age:BigInt)

通过Json数据来生成DataFrame
val df = spark.read.format(“json”).load(“/root/hd/tmp_files/people.json”)

将DataFrame转换成DataSet
df.as[Person].show

df.as[Person] 就是一个DataSet

(3)使用其他数据
RDD操作和DataFrame操作相结合 —> DataSet

读取数据,创建DataSet
val linesDS = spark.read.text(“/root/hd/tmp_files/test_WordCount.txt”).as[String]

对DataSet进行操作:
val words = linesDS.flatMap(.split(” “)).filter(.length > 3)

words.show
words.collect

执行一个WordCount程序
val result = linesDS.flatMap(.split(” “)).map((,1)).groupByKey( x => x._1).count
result.show

排序:


  
   result.orderBy($
   "value").show 
  
  
   result.orderBy($
   "count(1)").show 
  

2)DataSet操作案例
使用emp.json 生成一个DataFrame
val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)

查询工资大于3000的员工
empDF.where($”sal” >= 3000).show

创建case class

case class Emp(empno:BigInt,ename:String,job:String,mgr:String,hiredate:String,sal:BigInt,comm:String,deptno:BigInt)

生成DataSet
val empDS = empDF.as[Emp]

查询工资大于3000的员工
empDS.filter(_.sal > 3000).show

查询10号部门的员工
empDS.filter(_.deptno == 10).show

3)多表查询
(1)创建部门表
val deptRDD = sc.textFile(“/root/hd/tmp_files/dept.csv”).map(_.split(“,”))
case class Dept(deptno:Int,dname:String,loc:String)

val deptDS = deptRDD.map( x=> Dept(x(0).toInt,x(1),x(2))).toDS

(2)创建员工表
case class Emp(empno:Int,ename:String,job:String,mgr:Int,hiredate:String,sal:Int,comm:Int,deptno:Int)
val empRDD = sc.textFile(“/root/hd/tmp_files/emp.csv”).map(_.split(“,”))

7369,SMITH,CLERK,7902,1980/12/17,800,0,20
val empDS = empRDD.map(x=> Emp(x(0).toInt,x(1),x(2),x(3).toInt,x(4),x(5).toInt,x(6).toInt,x(7).toInt)).toDS

(3)执行多表查询:等值连接
val result = deptDS.join(empDS,”deptno”)
result.show
result.printSchema

val result1 = deptDS.joinWith(empDS, deptDS(“deptno”) === empDS(“deptno”) )
result1.show
result1.printSchema

join 和 joinWith 区别:连接后schema不同

join :将两张表展开成一张更大的表
joinWith :把两张表的数据分别做成一列,然后直接拼在一起

4)多表连接后再筛选
deptDS.join(empDS,”deptno”).where(“deptno == 10”).show

result.explain:执行计划

7、Spark SQL 中的视图

视图是一个虚表,不存储数据
两种类型:
1)普通视图(本地视图)
只在当前Session中有效createOrReplaceTempView createTempView

2)全局视图
createGlobalTempView
在不同的Session中都有用,把全局视图创建在命名空间中:global_temp中。类似于一个库

scala> df1.create
createGlobalTempView createOrReplaceTempView createTempView

举例:
创建一个新session,读取不到emp视图,报错
df1.createOrReplaceTempView(“emp”)
spark.sql(“select * from emp”).show
spark.newSession.sql(“select * from emp”)

以下两种方式均可读到全局视图中的数据
df1.createGlobalTempView(“emp1”)

spark.newSession.sql(“select * from global_temp.emp1”).show

spark.sql(“select * from global_temp.emp1”).show

二、使用数据源

在Spark SQL中,可以使用各种各样的数据源来操作。 结构化

1、使用load函数、save函数

load函数是加载数据,save是存储数据

注意:使用load 或 save时,默认是Parquet文件。列式存储文件

举例:
读取 users.parquet 文件
val userDF = spark.read.load(“/root/hd/tmp_files/users.parquet”)

userDF.printSchema
userDF.show

val userDF = spark.read.load(“/root/hd/tmp_files/emp.json”)

保存parquet文件


  
   userDF.select($
   "name",$
   "favorite_color")
   .write
   .save(
   "/root/hd/tmp_files/parquet") 
  

读取刚刚写入的文件:
val userDF1 = spark.read.load(“/root/hd/tmp_files/parquet/part-00000-f9a3d6bb-d481-4fc9-abf6-5f20139f97c5.snappy.parquet”)—> 不推荐

生产中直接读取存放的目录即可:
val userDF2 = spark.read.load(“/root/hd/tmp_files/parquet”)

读json文件 必须format
val userDF = spark.read.format(“json”).load(“/root/hd/tmp_files/emp.json”)
val userDF3 = spark.read.json(“/root/hd/tmp_files/emp.json”)

关于save函数
调用save函数的时候,可以指定存储模式,追加、覆盖等等
userDF.write.save(“/root/hd/tmp_files/parquet”)

userDF.write.save(“/root/hd/tmp_files/parquet”)
org.apache.spark.sql.AnalysisException: path file:/root/hd/tmp_files/parquet already exists.;

save的时候覆盖
userDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)

将结果保存成表
userDF.select($”name”).write.saveAsTable(“table1”)

scala> userDF.select($”name”).write.saveAsTable(“table1”)

scala> spark.sql(“select * from table1”).show
+——+
| name|
+——+
|Alyssa|
| Ben|
+——+

2、Parquet文件

列式存储文件,是Spark SQL 默认的数据源
就是一个普通的文件

举例:
1)把其他文件,转换成Parquet文件
调用save函数
把数据读进来,再写出去,就是Parquet文件

val empDF = spark.read.json(“/root/hd/tmp_files/emp.json”)
empDF.write.mode(“overwrite”).save(“/root/hd/tmp_files/parquet”)
empDF.write.mode(“overwrite”).parquet(“/root/hd/tmp_files/parquet”)

val emp1 = spark.read.parquet(“/root/hd/tmp_files/parquet”)
emp1.createOrReplaceTempView(“emp1”)
spark.sql(“select * from emp1”)

2)支持Schema的合并
项目开始 表结构简单 schema简单
项目越来越大 schema越来越复杂

举例:
通过RDD来创建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
“single”,”double” 是表结构
df1.show

df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=1”)

val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
df2.show
df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/key=2”)

合并两个部分
val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)

val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)

key是可以随意取名字的,两个key需要一致,不然合并会报错

通过RDD来创建DataFrame
val df1 = sc.makeRDD(1 to 5).map( i => (i,i*2)).toDF(“single”,”double”)
“single”,”double” 是表结构
df1.show

df1.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=1”)

val df2 = sc.makeRDD(6 to 10).map( i => (i,i*3)).toDF(“single”,”triple”)
df2.show
df2.write.mode(“overwrite”).save(“/root/hd/tmp_files/test_table/kt=2”)

合并两个部分
val df3 = spark.read.parquet(“/root/hd/tmp_files/test_table”)

val df3 = spark.read.option(“mergeSchema”,true).parquet(“/root/hd/tmp_files/test_table”)

3、json文件

读取Json文件,生成DataFrame
val peopleDF = spark.read.json(“/root/hd/tmp_files/people.json”)

peopleDF.printSchema

peopleDF.createOrReplaceTempView(“peopleView”)

spark.sql(“select * from peopleView”).show

Spark SQL 支持统一的访问接口。对于不同的数据源,读取进来,生成DataFrame后,操作完全一样

4、JDBC

使用JDBC操作关系型数据库,加载到Spark中进行分析和处理

方式一:


  
   .
   /spark-shell --master spark:/
   /hsiehchou121:7077 --jars /root
   /hd/tmp_files
   /mysql-connector-java-8.0.12.jar --driver-class-path /root
   /hd/tmp_files
   /mysql-connector-java-8.0.12.jar  
  

  
   val mysqlDF = spark.read.
   format(
   "jdbc") 
  
  
   .
   option(
   "url",
   "jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8") 
  
  
   .
   option(
   "driver",
   "com.mysql.cj.jdbc.Driver") 
  
  
   .
   option(
   "user",
   "root") 
  
  
   .
   option(
   "password",
   "123456") 
  
  
   .
   option(
   "dbtable",
   "emp").load 
  
  
    
  
  
   val mysqlDF = spark.read.
   format(
   "jdbc").
   option(
   "url",
   "jdbc:mysql://192.168.116.1/company?serverTimezone=UTC&characterEncoding=utf-8").
   option(
   "driver",
   "com.mysql.cj.jdbc.Driver").
   option(
   "user",
   "root").
   option(
   "password",
   "123456").
   option(
   "dbtable",
   "emp").load 
  

mysqlDF.show

问题解决:
如果遇到下面问题,就是你本机的mysql数据库没有权限给你虚拟机访问
java.sql.SQLException: null, message from server: “Host ‘hsiehchou121’ is not allowed to connect to this MySQL server”

解决方案


  
   1)进入你本机的数据库 
  
  
   mysql -u root -p 
  
  
   2)
   use mysql; 
  
  
   3)修改root用户前面的Host,改为%,意思是全部IP都能访问 
  
  
   4)
   flush 
   privileges; 
  

方式二:
定义一个Properties类


  
   import java
   .util
   .Properties 
  
  
   val mysqlProps = new Properties() 
  
  
   mysqlProps.setProperty(
   "driver",
   "com.mysql.cj.jdbc.Driver") 
  
  
   mysqlProps.setProperty(
   "user",
   "root") 
  
  
   mysqlProps.setProperty(
   "password",
   "123456") 
  
  
    
  
  
   val mysqlDF1 = spark
   .read
   .jdbc(
   "jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8",
   "emp",mysqlProps) 
  
  
    
  
  
   mysqlDF1.show 
  

5、使用Hive

比较常见
(*)spark SQL 完全兼容hive
(*)需要进行配置
拷贝一下文件到spark/conf目录下:
Hive 配置文件: hive-site.xml
Hadoop 配置文件:core-site.xml hdfs-site.xml

配置好后,重启spark

在hive的lib下和spark的jars下面增加mysql-connector-java-8.0.12.jar这边连接数据库的jar包

启动Hadoop :start-all.sh
启动 hive:


  
   hsiehchou121 
  
  
   cd hive/
   bin/ 
  
  
   ./hive --service metastore 
  
  
    
  
  
   hsiehchou122 
  
  
   cd hive/
   bin 
  
  
   ./hive 
  

hsiehchou121启动问题
java.sql.SQLSyntaxErrorException: Table ‘hive.version’ doesn’t exist
解决:去mysql数据库中的hive库下面创建version表
这里需要给本地的hive库创建下hive所必须用的表

我们去/root/hd/hive/scripts/metastore/upgrade/mysql这里面找到hive-schema-1.2.0.mysql.sql,将里面的sql语句在hive库中执行

hive-txn-schema-0.14.0.mysql.sql,这个也做好执行下,用于事务管理

显示当前所在库名字
set hive.cli.print.current.db=true;

j将emp.csv上传到hdfs中的/tmp_files/下面
hdfs dfs -put emp.csv /tmp_files

在hive中创建emp_default表


  
   hive (
   default)> 
   create table emp(empno int,ename string,job string,mgr int,hiredate string,sal int,comm int,deptno int) 
  
  
    > row format 
  
  
    > delimited fields 
  
  
    > terminated by ","; 
  
  
   hive (
   default)> load data inpath 
   '/tmp_files/emp.csv' 
   into table emp; 
  
  
   Time taken: 
   1.894 
   seconds 
  
  
   hive (default)> show tables; 
  
  
   hive (
   default)> 
   select * 
   from emp; 
  

hdfs dfs -put /root/hd/tmp_files/emp.csv /tmp_files


  
   [root
   @hsiehchou121 bin]
   # ./spark-shell --master spark://hsiehchou121:7077 
  

启动spatk时,如果出现如下错误
java.sql.SQLSyntaxErrorException: Table ‘hive.partitions’ doesn’t exist
在mysql数据库里面创建partitions表

scala> spark.sql(“select * from emp_default”).show
scala> spark.sql(“select * from default.emp_default”).show

spark.sql(“create table company.emp_4(empno Int,ename String,job String,mgr String,hiredate String,sal Int,comm String,deptno Int)row format delimited fields terminated by ‘,’”)
spark.sql(“load data local inpath ‘/root/hd/tmp_files/emp.csv’ overwrite into table company.emp_4”)

三、在IDE中开发Spark SQL

1、创建DataFrame StructType方式


  
   package day4 
  
  
    
  
  
   import org.apache.spark.sql.
   SparkSession 
  
  
   import org.apache.spark.sql.types.
   StructType 
  
  
   import org.apache.spark.sql.types.
   StructField 
  
  
   import org.apache.spark.sql.types.
   IntegerType 
  
  
   import org.apache.spark.sql.types.
   StringType 
  
  
   import org.apache.spark.sql.
   Row 
  
  
   import org.apache.log4j.
   Logger 
  
  
   import org.apache.log4j.
   Level 
  
  
    
  
  
   /** 
  
  
    * 创建DataFrame StructType方式 
  
  
    */ 
  
  
   object Demo1 { 
  
   
   def main(args: 
   Array[
   String]): 
   Unit = { 
  
  
    
  
   
   //减少Info日志的打印 
  
   
   Logger.getLogger(
   "org.apache.spark").setLevel(
   Level.
   ERROR) 
  
   
   Logger.getLogger(
   "org.eclipse.jetty.server").setLevel(
   Level.
   OFF) 
  
  
    
  
   
   //创建Spark Session对象 
  
   
   val spark = 
   SparkSession.builder().master(
   "local").appName(
   "Demo1").getOrCreate() 
  
  
    
  
   
   //从指定的地址创建RDD对象 
  
   
   val personRDD = spark.sparkContext.textFile(
   "H:\\other\\students.txt").map(_.split(
   "\t")) 
  
  
    
  
   
   //通过StructType方式指定Schema 
  
   
   val schema = 
   StructType( 
  
   
   List( 
  
   
   StructField(
   "id", 
   IntegerType), 
  
   
   StructField(
   "name", 
   StringType), 
  
   
   StructField(
   "age", 
   IntegerType))) 
  
  
    
  
   
   //将RDD映射到rowRDD上,映射到Schema上 
  
   
   val rowRDD = personRDD.map(p => 
   Row(p(
   0).toInt,p(
   1),p(
   2).toInt)) 
  
   
   val personDataFrame = spark.createDataFrame(rowRDD, schema) 
  
  
    
  
   
   //注册视图 
  
  
    personDataFrame.createOrReplaceTempView(
   "t_person") 
  
  
    
  
   
   //执行SQL语句 desc降序 asc 升序 
  
   
   val df = spark.sql(
   "select * from t_person order by age desc") 
  
  
    
  
  
    df.show 
  
  
    
  
  
    spark.stop() 
  
  
    } 
  
  
   } 
  

2、使用case class来创建DataFrame


  
   package day4 
  
  
    
  
  
   import org.apache.log4j.Logger 
  
  
   import org.apache.log4j.Level 
  
  
   import org.apache.spark.sql.SparkSession 
  
  
    
  
  
   /** 
  
  
    * 使用case class来创建DataFrame 
  
  
    */ 
  
  
   object Demo2 { 
  
   
   def main(args: Array[String]): Unit = { 
  
  
    
  
  
    //减少Info日志的打印 
  
  
    Logger
   .getLogger("org
   .apache
   .spark")
   .setLevel(Level
   .ERROR) 
  
  
    Logger
   .getLogger("org
   .eclipse
   .jetty
   .server")
   .setLevel(Level
   .OFF) 
  
  
    
  
  
    //创建Spark Session对象 
  
  
    val spark = SparkSession
   .builder()
   .master("local")
   .appName("Demo1")
   .getOrCreate() 
  
  
    
  
  
    //从指定的地址创建RDD对象 
  
  
    val lineRDD = spark
   .sparkContext
   .textFile("H:\\other\\students
   .txt")
   .map(_
   .split("\t")) 
  
  
    
  
  
    //把数据与case class做匹配 
  
  
    val studentRDD = lineRDD
   .map(x => Student(x(0)
   .toInt,x(1),x(2)
   .toInt)) 
  
  
    
  
  
    //生成DataFrame 
  
  
    import spark
   .sqlContext
   .implicits
   ._ 
  
  
    val studentDF = studentRDD
   .toDF() 
  
  
    
  
  
    //注册视图,执行SQL 
  
  
    studentDF
   .createOrReplaceTempView("student") 
  
  
    
  
  
    spark
   .sql("select * from student")
   .show 
  
  
    
  
  
    spark
   .stop() 
  
  
    } 
  
  
   } 
  
  
    
  
  
   //定义case class 
  
  
   case class Student(stuId:Int, stuName:String, stuAge:Int) 
  

3、写入mysql


  
   package day4 
  
  
    
  
  
   import org.apache.log4j.
   Logger 
  
  
   import org.apache.log4j.
   Level 
  
  
   import org.apache.spark.sql.
   SparkSession 
  
  
   import org.apache.spark.sql.types.
   IntegerType 
  
  
   import org.apache.spark.sql.types.
   StringType 
  
  
   import org.apache.spark.sql.
   Row 
  
  
   import org.apache.spark.sql.types.
   StructType 
  
  
   import org.apache.spark.sql.types.
   StructField 
  
  
   import java.util.
   Properties 
  
  
    
  
  
   /** 
  
  
    * 写入mysql 
  
  
    */ 
  
  
   object Demo3 { 
  
   
   def main(args: 
   Array[
   String]): 
   Unit = { 
  
  
    
  
   
   //减少Info日志的打印 
  
   
   Logger.getLogger(
   "org.apache.spark").setLevel(
   Level.
   ERROR) 
  
   
   Logger.getLogger(
   "org.eclipse.jetty.server").setLevel(
   Level.
   OFF) 
  
  
    
  
   
   //创建Spark Session对象 
  
   
   val spark = 
   SparkSession.builder().master(
   "local").appName(
   "Demo1").getOrCreate() 
  
  
    
  
   
   //从指定的地址创建RDD对象 
  
   
   val lineRDD = spark.sparkContext.textFile(
   "H:\\other\\students.txt").map(_.split(
   "\t")) 
  
  
    
  
   
   //通过StructType方式指定Schema 
  
   
   val schema = 
   StructType( 
  
   
   List( 
  
   
   StructField(
   "personID", 
   IntegerType), 
  
   
   StructField(
   "personName", 
   StringType), 
  
   
   StructField(
   "personAge", 
   IntegerType))) 
  
  
    
  
   
   //将RDD映射到rowRDD上,映射到Schema上 
  
   
   val rowRDD = lineRDD.map(p => 
   Row(p(
   0).toInt,p(
   1),p(
   2).toInt)) 
  
   
   val personDataFrame = spark.createDataFrame(rowRDD, schema) 
  
  
    
  
  
    personDataFrame.createOrReplaceTempView(
   "myperson") 
  
  
    
  
   
   val result = spark.sql(
   "select * from myperson") 
  
  
    
  
  
    result.show 
  
  
    
  
   
   //把结果存入mysql中 
  
   
   val props = 
   new 
   Properties() 
  
  
    props.setProperty(
   "user", 
   "root") 
  
  
    props.setProperty(
   "password", 
   "123456") 
  
  
    props.setProperty(
   "driver", 
   "com.mysql.cj.jdbc.Driver") 
  
  
    
  
  
    result.write.mode(
   "append").jdbc(
   "jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", 
   "student", props) 
  
  
    
  
  
    spark.stop() 
  
  
    } 
  
  
   } 
  

4、使用Spark SQL 读取Hive中的数据,将计算结果存入mysql


  
   package day4 
  
  
    
  
  
   import org.apache.spark.sql.
   SparkSession 
  
  
   import java.util.
   Properties 
  
  
    
  
  
   /** 
  
  
    * 使用Spark SQL 读取Hive中的数据,将计算结果存入mysql 
  
  
    */ 
  
  
   object Demo4 { 
  
   
   def main(args: 
   Array[
   String]): 
   Unit = { 
  
  
    
  
   
   //创建SparkSession 
  
   
   val spark = 
   SparkSession.builder().appName(
   "Demo4").enableHiveSupport().getOrCreate() 
  
  
    
  
   
   //执行SQL 
  
   
   val result = spark.sql(
   "select deptno,count(1) from company.emp group by deptno") 
  
  
    
  
   
   //将结果保存到mysql中 
  
   
   val props = 
   new 
   Properties() 
  
  
    props.setProperty(
   "user", 
   "root") 
  
  
    props.setProperty(
   "password", 
   "123456") 
  
  
    props.setProperty(
   "driver", 
   "com.mysql.cj.jdbc.Driver") 
  
  
    
  
  
    result.write.jdbc(
   "jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8", 
   "emp_stat", props) 
  
  
    
  
  
    spark.stop() 
  
  
    } 
  
  
   } 
  

提交任务


  
   [root@hsiehchou121 bin]# .
   /spark-submit --master spark:/
   /hsiehchou121:7077 --jars /root
   /hd/tmp_files
   /mysql-connector-java-8.0.12.jar --driver-class-path /root
   /hd/tmp_files
   /mysql-connector-java-8.0.12.jar --class day4.Demo4 /root
   /hd/tmp_files
   /Demo4.jar 
  

四、性能优化

与RDD类似

1、把内存中缓存表的数据

直接读取内存的值,来提高性能

RDD中如何缓存:
rdd.cache 或者 rdd.persist

在Spark SQL中,使用SparkSession.sqlContext.cacheTable

spark中所有context对象
1)sparkContext : SparkCore
2)sql Context : SparkSQL
3)Streaming Context :SparkStreaming

统一起来:SparkSession

操作mysql,启动spark shell 时,需要:
./spark-shell –master spark://hsiehchou121:7077 –jars /root/hd/tmp_files/mysql-connector-java-8.0.12.jar –driver-class-path /root/hd/tmp_files/mysql-connector-java-8.0.12.jar

val mysqlDF = spark.read.format(“jdbc”).option(“driver”,”com.mysql.cj.jdbc.Driver”).option(“url”,”jdbc:mysql://192.168.116.1:3306/company?serverTimezone=UTC&characterEncoding=utf-8”).option(“user”,”root”).option(“password”,”123456”).option(“dbtable”,”emp”).load

mysqlDF.show
mysqlDF.createOrReplaceTempView(“emp”)

spark.sqlContext.cacheTable(“emp”) —-> 标识这张表可以被缓存,数据还没有真正被缓存
spark.sql(“select * from emp”).show —-> 依然读取mysql
spark.sql(“select * from emp”).show —-> 从缓存中读取数据

spark.sqlContext.clearCache

清空缓存后,执行查询,会触发查询mysql数据库

2、了解性能优化的相关参数

将数据缓存到内存中的相关优化参数
spark.sql.inMemoryColumnarStorage.compressed
默认为 true
Spark SQL 将会基于统计信息自动地为每一列选择一种压缩编码方式

spark.sql.inMemoryColumnarStorage.batchSize
默认值:10000
缓存批处理大小。缓存数据时, 较大的批处理大小可以提高内存利用率和压缩率,但同时也会带来 OOM(Out Of Memory)的风险

其他性能相关的配置选项(不过不推荐手动修改,可能在后续版本自动的自适应修改)
spark.sql.files.maxPartitionBytes
默认值:128 MB
读取文件时单个分区可容纳的最大字节数

spark.sql.files.openCostInBytes
默认值:4M
打开文件的估算成本, 按照同一时间能够扫描的字节数来测量。当往一个分区写入多个文件的时候会使用。高估更好, 这样的话小文件分区将比大文件分区更快 (先被调度)

spark.sql.autoBroadcastJoinThreshold
默认值:10M
用于配置一个表在执行 join 操作时能够广播给所有 worker 节点的最大字节大小。通过将这个值设置为 -1 可以禁用广播。注意,当前数据统计仅支持已经运行了 ANALYZE TABLE COMPUTE STATISTICS noscan 命令的 Hive Metastore 表

spark.sql.shuffle.partitions
默认值:200
用于配置 join 或聚合操作混洗(shuffle)数据时使用的分区数

扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄