大数据笔记(三十一)——SparkStreaming详细介绍


Spark Streaming: Spark用于处理流式数据的模块,类似Storm

核心:DStream(离散流),就是一个RDD
============================================
一、Spark Streaming基础
1、什么是Spark Streaming?
(*)Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
(*)常见的流式处理框架
(1)Apache Storm
(2)Spark Streaming
(3)JStorm:阿里巴巴
(4)Flink:可以很好的管理内存

(*)离线计算和流式计算各自的特点
                   典型代表                 数据的采集          数据源(结果)
离线计算: MR、Spark Core         Sqoop                批量操作
流式计算: Storm等等                   Flume(Kafka) 实时性

(*)典型的流式计算的框架:参考Hadoop的课件:P91

2、简介Spark Streaming内部结构
技术分享图片

3、演示Demo:NetworkWordCount 处理的是流式数据
(*)工具:netcat
(*)文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
(*)步骤:启动两个窗口
第一个窗口中:

bin/run-example streaming.NetworkWordCount bigdata11 9999

第二个窗口中:启动消息服务器(先启动)

nc -l -p 9999


注意:如果要演示成功,保证虚拟机的CPU的核数至少2以上
技术分享图片

 

运行:

技术分享图片

 

 

4、开发自己的NetworkWordCount程序

技术分享图片

 1 package main.scala.demo
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 
 7 /**
 8   * Created by YOGA on 2018/2/27.
 9   */
10 object MyNetworkWordCount {
11   def main(args: Array[String]) {
12     //核心:通过StreamingContext对象,去创建一个DStream
13     //DStream从外部接收数据(使用的是Linux上的netcat工具)
14 
15     //创建一个SparkConf对象
16     //local[2]:相当于有两个工作线程,一个接收一个发送
17     val sparkconf = new SparkConf()
18                     .setAppName("MyNetworkWordCount")
19                       .setMaster("local[2]")
20 
21     //创建StreamContext,表示每隔三秒采集一次数据
22     val ssc = new StreamingContext(sparkconf,Seconds(3))
23 
24     //创建DStream,看成一个输入流
25   //IP,端口,缓存到硬盘
26 
27     val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
28 
29     //执行WordCount
30     val words = lines.flatMap(_.split(" "))
31 
32     //使用transform完成同样的计数,相当于map操作
33     //val wordPair = words.transform(x=>x.map(x=>(x,1)))
34     //val wordCount = wordPair.reduceByKey(_+_)
35     val wordCount = words.map((_,1)).reduceByKey(_+_)
36 
37     /*
38     * 参数一:执行运算
39     * 参数二:窗口的大小
40     * 参数三:创建滑动的距离
41     *
42     * 例子:每9秒钟,把过去30秒的数据进行wordcount
43     * 注意:第二个参数 第三个参数 必须是采样频率的整数倍
44     * */
45     //val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9))
46     //输出
47     wordCount.print()
48 
49     //启动StreamingContext
50     ssc.start()
51 
52     //等待计算完成
53     ssc.awaitTermination()
54   }
55 
56 }

 

二、Spark Streaming进阶

bin/spark-shell --master spark://bigdata11:7077
1、类:StreamingContext(类似:Spark Context、SQLContext)
上下文对象

创建的方式:
(1)通过SparkConf来创建

val sparkconf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")

//创建StreamingContext,表示每隔3秒采集一次数据
val ssc = new StreamingContext(sparkconf,Seconds(3))    

(2)通过SparkContext对象来创建

import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc,Seconds(3))

说明:
(1)setMaster("local[2]")
(2)当创建StreamingContext对象,内部会创建一个SparkContext对象
(3)当StreamingContext开始执行,不能添加新的任务
(4)同一个时刻上,JVM只能有一个活动的StreamingContext

2、DStream(离散流):把连续的数据流,变成不连续的离散流,表现形式就是RDD
简单来说:把连续的变成不连续的

技术分享图片

 


操作:Transformation和Action
? (*)transform(func)
? 通过RDD-to-RDD函数作用于源DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDD

改写上面WordCount例子,屏蔽35行

//使用transform完成同样的计数,相当于map操作
33     val wordPair = words.transform(x=>x.map(x=>(x,1)))
34     val wordCount = wordPair.reduceByKey(_+_)

 

(*)?updateStateByKey(func)
可以进行累加操作。方法:设置检查点,定义一个累加功能的函数

 1 package main.scala.demo
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 
 7 /**
 8   * Created by YOGA on 2018/2/28.
 9   */
10 object MyTotalNetworkWordCount {
11   def main(args: Array[String]) {
12     val sparkconf = new SparkConf()
13       .setAppName("MyNetworkWordCount")
14       .setMaster("local[2]")
15 
16     //创建StreamContext,表示每隔三秒采集一次数据
17     val ssc = new StreamingContext(sparkconf,Seconds(3))
18 
19     //注意:如果累计,在执行计算的时候,需要保持之前的状态信息
20     //设置检查点
21     ssc.checkpoint("hdfs://192.168.153.11:9000/spark/checkpoint0228")
22 
23     //创建DStream,看成一个输入流
24     val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
25 
26     //执行WordCount
27     val words = lines.flatMap(_.split(" "))
28 
29     //每个单词记一次数
30     val pairs = words.map((_,1))
31 
32     //定义一个函数,进行累加
33     //参数:1、当前的值 2、之前的值
34     val addFunc = (currentValues:Seq[Int],preValues:Option[Int]) =>{
35       //得到当前的值
36       val currentCount = currentValues.sum
37 
38       //先得到之前的值
39       val preCount = preValues.getOrElse(0)
40 
41       //返回累加结果
42       Some(currentCount + preCount)
43     }
44 
45     //统计每个单词出现的频率:累计
46     val totalCount = pairs.updateStateByKey(addFunc)
47     totalCount.print()
48 
49     //启动任务
50     ssc.start()
51     ssc.awaitTermination()
52 
53   }
54 }

 


3、窗口操作

技术分享图片

技术分享图片

 

例子:每9秒钟,把过去30秒的数据进行WordCount
注释上面的代码35行,放开下面一行代码

/*
38     * 参数一:执行运算
39     * 参数二:窗口的大小
40     * 参数三:创建滑动的距离
41     *
42     * 例子:每9秒钟,把过去30秒的数据进行wordcount
43     * 注意:第二个参数 第三个参数 必须是采样频率的整数倍,采样频率3s
44     * */
45     val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9))

4、输入和输出
(1)输入:接收器接收外部数据源的数据
(*)基本数据源:文件流、RDD队列流、Socket流
(*)高级数据源:Kafka、Flume
文件流:监听一个目录,当目录下的文件发生变化的时候,将变化的数据读入DStream

package main.scala.demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by YOGA on 2018/2/28.
  */
object MyFileDStream {
  def main(args: Array[String]) {
    //创建一个SparkConf对象
    //local[2]:相当于有两个工作线程,一个接收一个发送
    val sparkconf = new SparkConf()
      .setAppName("MyNetworkWordCount")
      .setMaster("local[2]")

    //创建StreamContext,表示每隔三秒采集一次数据
    val ssc = new StreamingContext(sparkconf,Seconds(3))

      //监听一个目录,当目录下的文件发生变化的时候,将变化的数据读入DStream
    val lines = ssc.textFileStream("D:\\temp\\aaa")

    lines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

 

RDD队列流queueStream

:定义一个for循环,生成RDD放入队列

package main.scala.demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDD
/**
  * Created by YOGA on 2018/2/28.
  */
object MyRDDQueueDStream {
  def main(args: Array[String]){
    val sparkconf = new SparkConf()
      .setAppName("MyNetworkWordCount")
      .setMaster("local[2]")

    //创建StreamContext,表示每隔三秒采集一次数据
    val ssc = new StreamingContext(sparkconf,Seconds(3))

    //创建一个队列,把生成RDD放入队列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //初始化
    for(i <- 1 to 3){
      rddQueue += ssc.sparkContext.makeRDD(1 to 10)

      //让线程睡几秒
      Thread.sleep(3000)

    }

    //创建一个RDD的DStream
    val inputStream = ssc.queueStream(rddQueue)
    //处理:乘以10
    val result = inputStream.map(x=> (x,x*10))
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

运行:

技术分享图片

 


(2)输出操作

技术分享图片

 


5、集成DataFrame和SQL: 使用SparkSQL的方式处理流式数据

把RDD转换成DataFrame,并生成临时表,然后就可以进行SQL查询

 1 package main.scala.demo
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.sql.SparkSession
 5 import org.apache.spark.storage.StorageLevel
 6 import org.apache.spark.streaming.{Seconds, StreamingContext}
 7 
 8 /**
 9   * Created by YOGA on 2018/2/28.
10   */
11 object MyNetWorkWordCountBySQL {
12   def main(args: Array[String]) {
13     //核心:通过StreamingContext对象,去创建一个DStream
14     //DStream从外部接收数据(使用的是Linux上的netcat工具)
15 
16     //创建一个SparkConf对象
17     //local[2]:相当于有两个工作线程,一个接收一个发送
18     val sparkconf = new SparkConf()
19       .setAppName("MyNetworkWordCount")
20       .setMaster("local[2]")
21 
22     //创建StreamContext,表示每隔三秒采集一次数据
23     val ssc = new StreamingContext(sparkconf,Seconds(3))
24 
25     //创建DStream,看成一个输入流
26     val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
27 
28     //得到的所有单词
29     val words = lines.flatMap(_.split(" "))
30     //val wordPair = words.transform(x=> x.map(x=>(x,1)))
31     //val wordCount = wordPair.reduceByKey(_+_)
32 
33     //使用sparkSQL处理Spark Streaming的数据
34     words.foreachRDD(rdd =>{
35       //使用SparkSession来创建
36       val spark = SparkSession.builder()
37                     .config(rdd.sparkContext.getConf)
38                     .getOrCreate()
39 
40       //需要把RDD转成一个DataFrame
41       import spark.implicits._
42       val wordCountDF = rdd.toDF("word")
43 
44       //注册成一个表
45       wordCountDF.createOrReplaceTempView("words")
46 
47       //执行SQL
48       val result = spark.sql("select * from words group by word")
49       result.show()
50 
51       Thread.sleep(5000)
52     })
53 
54 
55     //启动StreamingContext
56     ssc.start()
57 
58     //等待计算完成
59     ssc.awaitTermination()
60   }
61 }
文章来自:https://www.cnblogs.com/lingluo2017/p/8708600.html
© 2021 jiaocheng.bubufx.com  联系我们
ICP备案:鲁ICP备09046678号-3