Spark2.3.1 Kafka0.9使用直接的模式消费信息异常怎么办,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
依赖关系
groupIdorg.apache.spark/groupId
artifactIdspark-streaming-Kafka-0-8 _ 2.11/artifactId
版本2 .3 .1/版本
/依赖性
属国
groupIdorg.apache.spark/groupId
artifactIdspark-streaming _ 2.11/artifactId
版本2 .3 .1/版本
/dependency2.3.1即发动版本
Spark2.3.1+Kafka使用Direct模式消费信息
进口卡夫卡。序列化程序。字符串编辑器
导入组织。阿帕奇。火花。流媒体。卡夫卡。kafkautils
导入组织。阿帕奇。火花。流媒体。{秒,流上下文}
导入组织。阿帕奇。火花。{ SparkConf,SparkContext}
objectTest{
valzkQuorum=沧。cn:2181 '
valgroupId='nginx-cg '
valtopic=Map('nginx-log'-1)
valKAFKA_INTERVAL=10
case class ing of(:字符串中的域,ip:字符串)
defmain(args :数组[String]):单位={ 0
valsparkConf=newSparkConf().setAppName('NginxLogAnalyze ').setMaster('local[*]')
valparkcontext=NewSarkcontext(sparkConf)
valstreamContext=newStreamingContext(sparkContext,Seconds(KAFKA_INTERVAL))
valkafkaParam=映射[字符串,字符串](
自举。服务器'-' xx。xx。cn :9092 ',
group.id'-'nginx-cg ',
auto.offset.reset'-'最大'
)
valtopic=Set('nginx-log ')
valkafkaStream=kafkautils。createdirectstream(流上下文,kafkaParam,主题)
valcounter=kafkaStream。map(_ .toString().拆分("")。地图(项目=(项目(0))。拆分(',')(1) '-(第(2,1)项)。reduceByKey((x,y)=(x,y))
柜台。foreachrdd(rdd={ 0
rdd.foreach(println)
})
streamContext.start()
streamContext.awaitTermination()
}
}最大因为卡夫卡版本过低不支持最近的
Maven依赖
原因数据库: Java。朗。nosuchmethodexception 3360 Scala。运行时。没什么.init(卡夫卡。utils。可验证属性)
在Java。朗。上课。getconstructor 0(类。Java :3082)
在Java。朗。上课。getconstructor(类。Java :1825)
阿特罗格。阿帕奇。火花。流媒体。卡夫卡。kafkardd $ kafkardditerator。init(kafkardd。Scala :153)
阿特罗格。阿帕奇。火花。流媒体。卡夫卡。卡夫卡德。计算机(kafkardd。Scala :1360136)
阿特罗格。阿帕奇。火花。rdd。rdd。计算检查点。Scala :324)
阿特罗格。阿帕奇。火花。rdd。RDD。迭代器(RDD。Scala :288)
阿特罗格。阿帕奇。火花。rdd。mappartitionsdd。计算(mappartitionsdd。Scala 336038)
阿特罗格。阿帕奇。火花。rdd。rdd。计算检查点。Scala :324)
阿特罗格。阿帕奇。火花。rdd。RDD。迭代器(RDD。Scala :288)
阿特罗格。阿帕奇。火花。rdd。mappartitionsdd。计算(mappartitionsdd。Scala 336038)
阿特罗格。阿帕奇。火花。rdd。rdd。计算检查点。Scala :324)
阿特罗格。阿帕奇。火花。rdd。RDD。迭代器(RDD。Scala :288)
阿特罗格。阿帕奇。火花。调度程序。shufflemaptask。runtask(shufflemaptask。Scala :96)
阿特罗格。阿帕奇。火花。调度程序。shufflemaptask。runtask(shufflemaptask。Scala :53)
阿特罗格。阿帕奇。火花。调度程序。任务。奔跑
阿特罗格。阿帕奇。火花。执行者。执行者$ TaskRunner。运行(执行器。Scala :345)
.3更多
Direct模式代码
在验证卡夫卡属性时不能使用斯卡拉默认的类,需要指定卡夫卡带的类创建直接流[字符串,字符串,字符串代码,字符串代码]其中字符串编辑器必须是kafka.serializer.StringDecoder
看完上述内容,你们掌握Spark2.3.1 Kafka0.9使用直接的模式消费信息异常怎么办的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/146801.html