sparkstreaming统计(sparkstream实时计算结果)

技术SparkStreaming算子开发实例分析本篇文章为大家展示了SparkStreaming算子开发实例分析,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Spark Stre

本文给大家展示一个SparkStreaming运算符开发的实例分析,简洁易懂,绝对能让你大放异彩。希望通过这篇文章的详细介绍,你能有所收获。

Spark Streaming算子开发实例

transform算子开发

当应用于数据流时,变换操作可以用来执行任何RDD-RDD转换操作,也可以用来实现数据流API中没有提供的操作。比如,DStream API没有提供将一个DStream中的每一个批次与特定的RDD进行连接的操作,DStream中的连接操作符只能连接其他的DStream,但是我们可以自己使用转换操作来实现这个功能。

示例:黑名单用户实时过滤

package streaming demo import org . Apache . log4j . { Level,Logger } import org . Apache . spark . sparkconfimport org . Apache . spark . streaming . { Seconds,StreamingContext}

/* * *实时黑名单过滤*/对象转换演示{ def main(args : array[string]): unit={ 0

//设置日志级别Logger.getLogger('org ')。设置级别(级别。WARN) val conf=new SparkConf()。setAppName(this . getclass . getsimplename)。setMaster(' local[2]')val SSC=new streaming context(conf,Seconds(2))

//创建RDD瓦尔布莱克的黑名单

//通过套接字VallineSD stream=SSC从nc获取数据。SocketTextstream ('Hadoop 01 ',6666)

/* * *黑名单过滤用户语音* zs sb sb sb * Lisi操操操* Jack hello */linesdstream . map(x={ valinfo=x . split(')(info(0),info . tolist . tail . MK string('))})。改变

//transform是RDD-RDD运算,所以返回值一定是RDD。

/* * *在leftouterjoin操作之后,结果如下:* (zs,(sb sb sb sb),some (true)) * (lisi,(操他妈的操),some (true)) * (jack,(hello,none)) */val join RDD=

//如果是Some(真),表示是黑名单用户;如果是None,则表示没有被列入黑名单。保留未列入黑名单的用户。val filter rdd=join rdd . filter(x=x . 2 . isempty)filter rdd })。map(x=(x . 1,x . 2。_1)).print()(SSC . start())SSC . awaIttermination()} }

试验

启动nc,传入用户及其语音信息。

可以看到程序实时过滤掉黑名单中用户的语音。

updateStateByKey算子开发

updateStateByKey操作符可以保持任何状态,同时不断更新新信息。该操作员可以为每个键维护一个状态,并持续更新该状态。对于每个批次,火花将用于每个

个之前已经存在的key去应用一次State更新函数,无论这个key在batch中是否有新的值,如果State更新函数返回的值是none,那么这个key对应的state就会被删除;对于新出现的key也会执行state更新函数。

要使用该算子,必须进行两个步骤

定义state——state可以是任意的数据类型  定义state更新函数——用一个函数指定如何使用之前的状态,以及从输入流中获取新值更新状态

注意:updateStateByKey操作,要求必须开启Checkpoint机制

实例:基于缓存的实时WordCount

package StreamingDemoimport org.apache.log4j.{

Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}

/** * 基于缓存的实时WordCount,在全局范围内统计单词出现次数 */object UpdateStateByKeyDemo { def main(args: Array[String]): Unit = {  

//设置日志级别  Logger.getLogger("org").setLevel(Level.WARN)  

/**   * 如果没有启用安全认证或者从Kerberos获取的用户为null,那么获取HADOOP_USER_NAME环境变量,   * 并将它的值作为Hadoop执行用户设置hadoop username   * 这里实验了一下在没有启用安全认证的情况下,就算不显式添加,也会自动获取我的用户名   */  //System.setProperty("HADOOP_USER_NAME","Setsuna")  val conf = new SparkConf()   .setAppName(this.getClass.getSimpleName)   .setMaster("local[2]")  val ssc = new StreamingContext(conf, Seconds(2))  

//设置Checkpoint存放的路径  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")  

//创建输入DStream  val lineDStream = ssc.socketTextStream("Hadoop01", 6666)  val wordDStream = lineDStream.flatMap(_.split(" "))  val pairsDStream = wordDStream.map((_, 1))  

/**   * state:代表之前的状态值   * values:代表当前batch中key对应的values值   */  val resultDStream =   pairsDStream.updateStateByKey((values: Seq[Int], state: Option[Int]) => {    

//当state为none,表示没有对这个单词做统计,则返回0值给计数器count    var count = state.getOrElse(0)    

//遍历values,累加新出现的单词的value值    for (value <- values) {     count += value    }    

//返回key对应的新state,即单词的出现次数    Option(count)   })  

//在控制台输出  resultDStream.print()  ssc.start()  ssc.awaitTermination() }}

测试

开启nc,输入单词

控制台实时输出的结果

window滑动窗口算子开发

Spark Streaming提供了滑动窗口操作的支持,可以对一个滑动窗口内的数据执行计算操作在滑动窗口中,包含批处理间隔、窗口间隔、滑动间隔

对于窗口操作而言,在其窗口内部会有N个批处理数据  批处理数据的大小由窗口间隔决定,而窗口间隔指的就是窗口的持续时间,也就是窗口的长度  滑动时间间隔指的是经过多长时间窗口滑动一次,形成新的窗口,滑动间隔默认情况下和批处理时间间隔的相同

注意:滑动时间间隔和窗口时间间隔的大小一定得设置为批处理间隔的整数倍

用一个官方的图来作为说明

批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time1-time3,只有窗口间隔满足了才触发数据的处理。所以滑动窗口操作都必须指定两个参数,窗口长度和滑动时间间隔。在Spark Streaming中对滑动窗口的支持是比Storm更加完善的。

Window滑动算子操作

算子      描述              window()      对每个滑动窗口的数据执行自定义的计算              countByWindow()      对每个滑动窗口的数据执行count操作              reduceByWindow()      对每个滑动窗口的数据执行reduce操作              reduceByKeyAndWindow()      对每个滑动窗口的数据执行reduceByKey操作              countByValueAndWindow()      对每个滑动窗口的数据执行countByValue操作

reduceByKeyAndWindow算子开发

实例:在线热点搜索词实时滑动统计

每隔2秒钟,统计最近5秒钟的搜索词中排名最靠前的3个搜索词以及出现次数

package StreamingDemoimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}

/** * 需求:每隔2秒钟,统计最近5秒钟的搜索词中排名最靠前的3个搜索词以及出现次数 */object ReduceByKeyAndWindowDemo { def main(args: Array[String]): Unit = {  

//设置日志级别  Logger.getLogger("org").setLevel(Level.WARN)  

//基础配置  val conf = new SparkConf()   .setAppName(this.getClass.getSimpleName)   .setMaster("local[2]")  

//批处理间隔设置为1s  val ssc = new StreamingContext(conf, Seconds(1))  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)  linesDStream   .flatMap(_.split(" ")) 

//根据空格来做分词   .map((_, 1)) 

//返回(word,1)   .reduceByKeyAndWindow(    //定义窗口如何计算的函数    

//x代表的是聚合后的结果,y代表的是这个Key对应的下一个需要聚合的值    (x: Int, y: Int) => x + y,    

//窗口长度为5秒    Seconds(5),    

//窗口时间间隔为2秒    Seconds(2)   )   .transform(rdd => { 

//transform算子对rdd做处理,转换为另一个rdd    

//根据Key的出现次数来进行排序,然后降序排列,获取最靠前的3个搜索词    val info: Array[(String, Int)] = rdd.sortBy(_._2, false).take(3)    

//将Array转换为resultRDD    val resultRDD = ssc.sparkContext.parallelize(info)    resultRDD   })   .map(x => s"${x._1}出现的次数是:${x._2}")   .print()  ssc.start()  ssc.awaitTermination() }}

测试结果

DStream Output操作概览

Spark Streaming允许DStream的数据输出到外部系统,DSteram中的所有计算,都是由output操作触发的,foreachRDD输出操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。

转换描述              

print()在Driver中打印出DStream中数据的前10个元素。主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。              

saveAsTextFiles(prefix, [suffix]将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。              

saveAsObjectFiles(prefix, [suffix])将DStream中的内容按对象序列化并且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。              

saveAsHadoopFiles(prefix, [suffix])将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。              

foreachRDD(func)最基本的输出操作,将func函数应用于DStream中的RDD上,这个操作会输出数据到外部系统,比如保存RDD到文件或者网络数据库等。需要注意的是func函数是在运行该streaming      应用的Driver进程里执行的。

foreachRDD算子开发

foreachRDD是最常用的output操作,可以遍历DStream中的每个产生的RDD并进行处理,然后将每个RDD中的数据写入外部存储,如文件、数据库、缓存等,通常在其中针对RDD执行action操作,比如foreach

使用foreachRDD操作数据库

通常在foreachRDD中都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储

误区一:在RDD的foreach操作外部创建Connection

dstream.foreachRDD { rdd =>  val connection=createNewConnection()  rdd.foreach { record => connection.send(record)  }}

这种方式是错误的,这样的方式会导致Connection对象被序列化后被传输到每一个task上,但是Connection对象是不支持序列化的,所以也就无法被传输

误区二:在RDD的foreach操作内部创建Connection

dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection()    connection.send(record)    connection.close()  }}

这种方式虽然是可以的,但是执行效率会很低,因为它会导致对RDD中的每一条数据都创建一个Connection对象,通常Connection对象的创建都是很消耗性能的

合理的方式

第一种:使用RDD的foreachPartition操作,并且在该操作内部创建Connection对象,这样就相当于为RDD的每个partition创建一个Connection对象,节省了很多资源  第二种:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部从静态连接池中,通过静态方法获取到一个连接,连接使用完之后再放回连接池中。这样的话,可以在多个RDD的partition之间复用连接了

实例:实时全局统计WordCount,并将结果保存到MySQL数据库中

MySQL数据库建表语句如下

CREATE TABLE wordcount (  word varchar(100) CHARACTER SET utf8 NOT NULL,  count int(10) NOT NULL,  PRIMARY KEY (word)) ENGINE=InnoDB DEFAULT CHARSET=latin1;

在IDEA中添加mysql-connector-java-5.1.40-bin.jar

代码如下

连接池的代码,其实一开始有想过用静态块来写个池子直接获取,但是如果考虑到池子宽度不够用的问题,这样的方式其实更好,一开始,实例化一个连接池出来,被调用获取连接,当连接全部都被获取了的时候,池子空了,就再实例化一个池子出来

package StreamingDemoimport java.sql.{Connection, DriverManager, SQLException}import java.utilobject JDBCManager { var connectionQue: java.util.LinkedList[Connection] = null 

/**  * 从数据库连接池中获取连接对象  * @return  */ def getConnection(): Connection = {  synchronized({   try {    

//如果连接池是空的,那么就实例化一个Connection类型的链表    

if (connectionQue == null) {    

 connectionQue = new util.LinkedList[Connection]()     

for (i <- 0 until (10)) {      

//生成10个连接,并配置相关信息      val connection = DriverManager.getConnection(       "jdbc:mysql://Hadoop01:3306/test?characterEncoding=utf-8",       "root",       "root")      

//将连接push进连接池      connectionQue.push(connection)     }    }   } catch {    

//捕获异常并输出    case e: SQLException => e.printStackTrace()   }   

//如果连接池不为空,则返回表头元素,并将它在链表里删除   return connectionQue.poll()  }) } 

/**  * 当连接对象用完后,需要调用这个方法归还连接  * @param connection  */ def returnConnection(connection: Connection) = {  

//插入元素  connectionQue.push(connection) } def main(args: Array[String]): Unit = {  

//main方法测试  getConnection()  println(connectionQue.size()) }}

wordcount代码

package StreamingDemoimport org.apache.log4j.{Level, Logger}import org.apache.spark.{SparkConf, streaming}import org.apache.spark.streaming.{Seconds, StreamingContext}object ForeachRDDDemo { def main(args: Array[String]): Unit = {  

//设置日志级别,避免INFO信息过多  Logger.getLogger("org").setLevel(Level.WARN)  

//设置Hadoop的用户,不加也可以  System.setProperty("HADOOP_USER_NAME", "Setsuna")  

//Spark基本配置  val conf = new SparkConf()   .setAppName(this.getClass.getSimpleName)   .setMaster("local[2]")  val ssc = new StreamingContext(conf, streaming.Seconds(2))  

//因为要使用updateStateByKey,所以需要使用checkpoint  ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")  

//设置socket,跟nc配置的一样  val linesDStream = ssc.socketTextStream("Hadoop01", 6666)  val wordCountDStream = linesDStream   .flatMap(_.split(" "))   

//根据空格做分词   .map((_, 1)) 

//生成(word,1)   .updateStateByKey((values: Seq[Int], state: Option[Int]) => {    

//实时更新状态信息    var count = state.getOrElse(0)    for (value <- values) {     count += value    }    Option(count)   })  wordCountDStream.foreachRDD(rdd => {   

if (!rdd.isEmpty()) {    rdd.foreachPartition(part => {     

//从连接池中获取连接     val connection = JDBCManager.getConnection()     part.foreach(data => {      val sql = //往wordcount表中插入wordcount信息,on duplicate key update子句是有则更新无则插入       s"insert into wordcount (word,count) " +        s"values ('${data._1}',${data._2}) on duplicate key update count=${data._2}"     

 //使用prepareStatement来使用sql语句      val pstmt = connection.prepareStatement(sql)      pstmt.executeUpdate()     })     

//在连接处提交完数据后,归还连接到连接池     JDBCManager.returnConnection(connection)    })   }  })  ssc.start()  ssc.awaitTermination() }}

打开nc,输入数据

在另一个终端对wordcount的结果进行查询,可以发现是实时发生变化的

上述内容就是SparkStreaming算子开发实例分析,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/149832.html

(0)

相关推荐

  • HDFS的工作原理是什么呢

    技术HDFS的工作原理是什么呢这期内容当中小编将会给大家带来有关HDFS的工作原理是什么呢,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。Hadoop分布式文件系统(HDFS)是一种被设

    攻略 2021年12月3日
  • 让我看看英语,让我看看你的钢笔英语怎么说

    技术让我看看英语,让我看看你的钢笔英语怎么说给我看一下你的钢笔,翻译是show me your pen让我看看英语。关键词汇是动词show。 用了词组show sb sth意思是把某物展示给某人看。解释:show

    生活 2021年10月28日
  • 为什么晕车,一坐车就晕车是为什么呢

    技术为什么晕车,一坐车就晕车是为什么呢晕动病是汽车为什么晕车、轮船或飞机运动时所产生的颠簸、摇摆或旋转等任何形式的加速运动,刺激人体的前庭神经而发生的疾病。患者初时感觉上腹不适,继有恶心、面色苍白、出冷汗,旋即有眩晕、精

    生活 2021年11月1日
  • Linux DRM内核模块怎么定义

    技术Linux DRM内核模块怎么定义本篇内容主要讲解“Linux DRM内核模块怎么定义”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Linux DRM内核模块怎么定义”吧

    攻略 2021年11月23日
  • 没有配置环境变量django可以用吗(创建django时如何选择编译器)

    技术启动uwsgi报错提示找不到django的模块怎么办小编给大家分享一下启动uwsgi报错提示找不到django的模块怎么办,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获

    攻略 2021年12月18日
  • leetcode滑动窗口(leetcode滑动窗口万能公式)

    技术LeetCode怎样找出滑动窗口的最大值小编给大家分享一下LeetCode怎样找出滑动窗口的最大值,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一

    攻略 2021年12月15日