Spark MLlib

MLlib 是 Spark 可以扩展的机器学习库

MLlib is Apache Spark’s scalable machine learning library.

SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。

一、MLlib概述

MLlib 是 Spark 可以扩展的机器学习库

Spark在机器学习方面具有得天独厚的有事,有以下几个原因:

1、机器学习算法

一般都有多个步骤迭代计算,需要在多次迭代后,获得足够小的误差或者收敛才会停止


  
   double wucha = 
   1.0 
  
  
   while(wucha>=0.00001){ 
  
  
    建模 wucha -= 某个值 
  
  
   } 
  

模型计算完毕

当迭代使用Hadoop的MapReduce计算框架时,每次都要读写硬盘以及任务启动工作,导致很大的IO开销

而Spark基于内存的计算模型天生擅长迭代计算。只有在必要时,才会读写硬盘

所以Spark是机器学习比较理想的平台

2、通信

Hadoop的MapReduce计算框架,通过heartbeat方式来进行通信和传递数据,执行速度慢

spark 有高效的 Akka 和 Netty 的通信系统,通行效率高

Spark MLlib 是Spark 对常用的机器学习算法的实现库,同时包括相关测试和数据生成器

二、什么是机器学习

1、机器学习的定义

A computer program is said to learn from experience E with respect to some class of tasks T and performance measure P,
if its performance at tasks in T, as measured by P, improves with experience E

三个关键词:算法、经验、模型评价

在数据的基础上,通过算法构建出模型,并进行评价
如果达到要求,则用该模型测试其他数据
如果不达到要求,要调整算法来重新建立模型,再次进行评估
循环往复,知道获得满意的经验

应用:金融反欺诈、语音识别、自然语言处理、翻译、模式识别、智能控制等等

2、基于大数据的机器学习

传统的机器学习算法,由于技术和单机存储的现值,只能在少量数据上使用
即,依赖于数据抽样
问题:很难做好随机,导致学习的模型不准确

在大数据上进行机器学习,直接处理全量数据并进行大量迭代计算

Spark本身计算优势,适合机器学习

另外 spark-shell pyspark 都可以提供及时查询工具

3、MLlib

MLlib是Spark机器学习库,简化机器学习的工程实践工作,方便扩展到更大规模
集成了通用的学习算法:分类、回归、聚类、协同过滤、降维等等

另外,MLlib本身在Spark中,数据清洗、SQL、建模放在一起

sample_linear_regression_data.txt
1 1:1.9
2 1:3.1
3 1:4
3.5 1:4.45
4 1:5.02
9 1:9.97
-2 1:-0.98


  
   package day7 
  
  
    
  
  
   import org.apache.spark.sql.
   SparkSession 
  
  
   import org.apache.spark.ml.regression.
   LinearRegression 
  
  
    
  
  
   /* 
  
  
    * 1.3850645873427236 1:0.14476184437006356 2:-0.11280617018445871 3:-0.4385084538142101 4:-0.5961619435136434 5:0.419554626795412 6:-0.5047767472761191 7:0.457180284958592 8:-0.9129360314541999 9:-0.6320022059786656 10:-0.44989608519659363 
  
  
    *  
  
  
    */ 
  
  
   object Demo1 { 
  
   
   def main(args: 
   Array[
   String]): 
   Unit = { 
  
   
   val spark = 
   SparkSession.builder().appName(
   "Demo1").master(
   "local").getOrCreate() 
  
  
    
  
   
   val data_path = 
   "H:\\sample_linear_regression_data.txt" 
  
  
    
  
   
   //读取训练数据 
  
   
   val trainning = spark.read.format(
   "libsvm").load(data_path) 
  
  
    
  
   
   //定义模型 
  
   
   val lr = 
   new 
   LinearRegression().setMaxIter(
   10000) 
  
  
    
  
   
   //训练模型 
  
   
   val lrModel = lr.fit(trainning) 
  
  
    
  
   
   //获取模型训练结果 
  
   
   val trainningSummary = lrModel.summary 
  
  
    
  
   
   //获取预测值 
  
  
    trainningSummary.predictions.show() 
  
  
    
  
   
   //获取误差 
  
  
    print(trainningSummary.rootMeanSquaredError) 
  
  
    
  
  
    spark.stop() 
  
  
    } 
  
  
   } 
  

Spark Graphx

一、Spark Graphx 是什么?

1、是Spark 的一个模块,主要用于进行以图为核心的计算,还有分布式图计算

2、Graphx 底层基于RDD计算,和RDD共用一种存储形态。在展示形态上,可以用数据集来表示,也可以用图来表示

二、Spark GraphX 有哪些抽象?

1、顶点

RDD[(VertexId,VD)]表示
VertexId 代表了顶点的ID,是Long类型
VD 是顶点的属性,可以是任何类型

2、边

RDD[Edge[ED]]表示
Edge表示一个边
包含一个ED类型参数来设定属性
另外,边还包含了源顶点ID和目标顶点ID

3、三元组

三元组结构用RDD[EdgeTriplet[VD,ED]]表示
三元组包含一个边、边的属性、源顶点ID、源顶点属性、目标顶点ID、目标顶点属性

4、图

Graph表示,通过顶点和边来构建


  
   package day7 
  
  
    
  
  
   import org.apache.spark.
   SparkConf 
  
  
   import org.apache.spark.
   SparkContext 
  
  
   import org.apache.spark.graphx.
   Edge 
  
  
   import org.apache.spark.graphx.
   Graph 
  
  
    
  
  
   object Demo2 { 
  
   
   def main(args: 
   Array[
   String]): 
   Unit = { 
  
   
   val conf = 
   new 
   SparkConf().setAppName(
   "Demo2").setMaster(
   "local") 
  
  
    
  
   
   //创建Spark Context对象 
  
   
   val sc = 
   new 
   SparkContext(conf) 
  
  
    
  
   
   //定义点 
  
   
   val users = sc.parallelize(
   Array((
   3L,(
   "TIme",
   "student")),(
   5L,(
   "Andy",
   "student")), 
  
  
    (
   7L,(
   "Mary",
   "student")),(
   2L,(
   "Lily",
   "post")))) 
  
  
    
  
   
   //定义边 
  
   
   val relationship = sc.parallelize(
   Array(
   Edge(
   3L,
   7L,
   "col"),
   Edge(
   5L,
   3L,
   "ad"),
   Edge(
   2L,
   5L,
   "col"),
   Edge(
   5L,
   7L,
   "heh"))) 
  
  
    
  
   
   //构建图 
  
   
   val graph = 
   Graph(users, relationship) 
  
  
    
  
   
   //图的操作 
  
   
   val post_count = graph.vertices.filter{ 
   case (id,(name,pos)) => pos==
   "post"}.count 
  
  
    
  
  
    println(
   "post count is " + post_count) 
  
  
    
  
   
   val edges_count = graph.edges.filter(e => e.srcId > e.dstId).count() 
  
  
    
  
  
    println(
   "the value is " + edges_count) 
  
  
    } 
  
  
   } 
  
扫码关注我们
微信号:SRE实战
拒绝背锅 运筹帷幄