sparkstreaming数据量(使用sparkstreaming的必要性)

技术如何理解Spark Streaming的数据可靠性和一致性如何理解Spark Streaming的数据可靠性和一致性,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决

如何理解Spark Streaming数据的可靠性和一致性,相信很多没有经验的人都不知所措。因此,本文总结了出现问题的原因和解决方法,希望大家可以通过这篇文章来解决这个问题。

目前,大数据领域最热门的词汇之一就是流计算,其中最耀眼的项目无疑是来自Spark社区的Spark Streaming项目,自诞生以来就受到广泛关注,发展迅速,如今更是有了赶超Storm的架势。

毫无疑问,流计算的核心特点是其低延迟能力,这主要来自于计算数据不离开磁盘的内在机制,但也带来了数据可靠性的问题,即当出现节点故障或网络异常时,如何在节点间进行适当的协商进行重传。此外,在计划外数据重传的情况下,我们如何确保没有重复数据,并且所有数据都是精确一次?如果这些问题得不到解决,大数据的流计算将无法满足大部分企业级的可靠性要求,将名存实亡。

以下将重点分析Spark Streaming如何设计可靠性机制并实现数据一致性。

Driver HA

由于streaming计算系统运行时间长,数据不断流入,其Spark Driver的可靠性非常重要,决定了Streaming程序能否一直正确运行。

如何理解Spark  Streaming的数据可靠性和一致性

图一 Driver数据持久化

驱动程序实现高可用性的解决方案是持久化元数据,以便在重启后恢复状态。如图1所示,驱动程序保存的元数据包括:

在接收器从网络接收的数据被组装成块之后生成的Block元数据(图一中的绿色箭头):块元数据;

Checkpoint数据(图一中的橙色箭头):包括配置项目、数据流操作、未完成批处理状态和生成的RDD数据等。

如何理解Spark  Streaming的数据可靠性和一致性

图二 Driver故障恢复

驱动程序无法重新启动后:

恢复(图2中的橙色箭头):使用检查点数据重新启动驱动程序,重建上下文并重新启动接收器。

恢复元数据块(图2中的绿色箭头):恢复块元数据。

恢复未完成的作业(图2中的红色箭头):使用恢复的元数据再次生成RDD和相应的作业,然后提交给Spark集群执行。

通过上述数据备份和恢复机制,驱动程序可以在故障后重启,并且仍然可以在不丢失数据的情况下恢复流任务,从而提供系统级数据的高可靠性。

可靠的上下游IO系统

流计算主要通过网络套接字通信实现与外部IO系统的数据交互。由于网络通信的不可靠性,发送方和接收方需要一定的协议来保证数据包的接收确认和失败重传机制。

并不是所有的IO系统都支持重传,这至少需要数据流的持久性,以及高吞吐量和低延迟。在Spark Streaming官方支持的数据源中,只有Kafka能够同时满足这些要求,所以在最近的Spark Streaming发布中,Kafka也被视为推荐的外部数据系统。

除了Kafka作为入站数据源之外,它还通常被用作出站数据源。所有基于卡夫卡的实时系统都通过MQ订阅和分发数据,从而实现流数据生产者和消费者的解耦。

企业大数据中心数据流的典型视图如下:

如何理解Spark  Streaming的数据可靠性和一致性

图三 企业大数据中心数据流向视图

除了确保数据可以从源重新传输之外,卡夫卡还是流数据的精确一次语义的重要保证。Kafka提供了一个低级API,使客户端能够访问主题数据流及其元数据。Spark Streaming的每个接收任务都可以从指定的Kafka主题、分区和偏移量中获取数据流。每个任务的数据边界清晰,任务失败后可以再次接收。

这部分数据而不会产生“重叠的”数据,因而保证了流数据“有且仅处理一次”。

可靠的接收器

在Spark 1.3版本之前,Spark Streaming是通过启动专用的Receiver任务来完成从Kafka集群的数据流拉取。

Receiver任务启动后,会使用Kafka的高级API来创建topicMessageStreams对象,并逐条读取数据流缓存,每个batchInerval时刻到来时由JobGenerator提交生成一个spark计算任务。

由于Receiver任务存在宕机风险,因此Spark提供了一个高级的可靠接收器-ReliableKafkaReceiver类型来实现可靠的数据收取,它利用了Spark 1.2提供的WAL(Write Ahead Log)功能,把接收到的每一批数据持久化到磁盘后,更新topic-partition的offset信息,再去接收下一批Kafka数据。万一Receiver失败,重启后还能从WAL里面恢复出已接收的数据,从而避免了Receiver节点宕机造成的数据丢失(以下代码删除了细枝末节的逻辑):

class ReliableKafkaReceiver{  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null  override def onStart(): Unit = {    // Initialize the topic-partition / offset hash map.    topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]    // Initialize the block generator for storing Kafka message.    blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)    messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(      topics.values.sum, "KafkaMessageHandler")    blockGenerator.start()    val topicMessageStreams = consumerConnector.createMessageStreams(      topics, keyDecoder, valueDecoder)    topicMessageStreams.values.foreach { streams =>      streams.foreach { stream =>        messageHandlerThreadPool.submit(new MessageHandler(stream))      }    }  }

启用WAL后虽然Receiver的数据可靠性风险降低了,但却由于磁盘持久化带来的开销,系统整体吞吐率会有明显的下降。因此,在最新发布的Spark 1.3版本里,Spark Streaming增加了使用Direct API的方式来实现Kafka数据源的访问。

引入了Direct API后,Spark Streaming不再启动常驻的Receiver接收任务,而是直接分配给每个Batch及RDD最新的topic partition offset。job启动运行后Executor使用Kafka的simple consumer API去获取那一段offset的数据。

这样做的好处不仅避免了Receiver宕机带来的数据可靠性风险,同时也由于避免使用ZooKeeper做offset跟踪,而实现了数据的精确一次性(以下代码删除了细枝末节的逻辑):

class DirectKafkaInputDStream{  protected val kc = new KafkaCluster(kafkaParams)  protected var currentOffsets = fromOffsets  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))    val rdd = KafkaRDD[K, V, U, T, R](      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)    Some(rdd)  }

预写日志 Write Ahead Log

Spark 1.2开始提供了预写日志能力,用于Receiver数据及Driver元数据的持久化和故障恢复。WAL之所以能提供持久化能力,是因为它利用了可靠的HDFS做数据存储。

Spark Streaming预写日志机制的核心API包括:

  • 管理WAL文件的WriteAheadLogManager

  • 读/写WAL的WriteAheadLogWriter和WriteAheadLogReader

  • 基于WAL的RDD:WriteAheadLogBackedBlockRDD

  • 基于WAL的Partition:WriteAheadLogBackedBlockRDDPartition

以上核心API在数据接收和恢复阶段的交互示意图如图四所示。

如何理解Spark Streaming的数据可靠性和一致性

图四 基于WAL的数据接收和恢复示意图

从WriteAheadLogWriter的源码里可以清楚地看到,每次写入一块数据buffer到HDFS后都会调用flush方法去强制刷入磁盘,然后才去取下一块数据。因此receiver接收的数据是可以保证持久化到磁盘了,因而做到了较好的数据可靠性。

private[streaming] class WriteAheadLogWriter{  private lazy val stream = HdfsUtils.getOutputStream(path, hadoopConf)  def write(data: ByteBuffer): WriteAheadLogFileSegment = synchronized {    data.rewind() // Rewind to ensure all data in the buffer is retrieved    val lengthToWrite = data.remaining()    val segment = new WriteAheadLogFileSegment(path, nextOffset, lengthToWrite)    stream.writeInt(lengthToWrite)    if (data.hasArray) {      stream.write(data.array())    } else {      while (data.hasRemaining) {        val array = new Array[Byte](data.remaining)        data.get(array)        stream.write(array)      }    }    flush()    nextOffset = stream.getPos()    segment  }

看完上述内容,你们掌握如何理解Spark Streaming的数据可靠性和一致性的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/149263.html

(0)

相关推荐

  • Android界面设计基础中控件焦点的步骤是什么

    技术Android界面设计基础中控件焦点的步骤是什么这篇文章给大家介绍Android界面设计基础中控件焦点的步骤是什么,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Android设备有多种多样,操纵

    攻略 2021年11月26日
  • 北京名胜古迹介绍,北京现存的文物古迹有哪些

    技术北京名胜古迹介绍,北京现存的文物古迹有哪些很高兴回答你的问题!以下是我罗列的21处北京现存的文物古迹景点北京名胜古迹介绍,希望能对你有所帮助!1、北京故宫,国家5A级景区、世界文化遗产、全国重点文物保护单位。世界上现

    生活 2021年10月28日
  • 微信支付集成工具是什么

    技术微信支付集成工具是什么这篇文章将为大家详细讲解有关微信支付集成工具是什么,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。微信支付集成工具最近老板又安排了新项目,要接入微

    攻略 2021年10月20日
  • 儒林外史王冕,儒林外史人物故事及性格

    技术儒林外史王冕,儒林外史人物故事及性格儒林外史人物故事性格儒林外史王冕:王冕
    1.王冕是历史上真实存在的人物,作者据此进行了改编。王冕在小说中具有重要的作用,在整个小说的人物塑造上,他奠定了作者理想人物的基本特点,正如

    生活 2021年10月21日
  • 户部掌管什么,古代掌管财政的是什么官员

    技术户部掌管什么,古代掌管财政的是什么官员中国古代专门掌管户籍财经的机关称为:户部   户部为六部之一,长官为户部尚书,曾称地官、大司徒、计相、大司农等户部掌管什么。   户部起源于先秦,《周庄》记载此职为“地官大司徒”

    生活 2021年10月25日
  • JVM字符串常量池及String的intern方法是什么样的

    技术JVM字符串常量池及String的intern方法是什么样的这篇文章给大家介绍JVM字符串常量池及String的intern方法是什么样的,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。关于字符串

    攻略 2021年10月23日