Kafka做为一款流行的分布式发布订阅消息系统,以高吞吐、低延时、高可靠的特点著称,已经成为Spark Streaming常用的流数据来源。
其实说白了,官方提供的思路就是,把JavaInputDStream转换为OffsetRange对象,该对象具有topic对应的分区的所有信息,每次batch处理完,Spark Streaming都会自动更新该对象,所以你只需要找个合适的地方保存该对象(比如HBase、HDFS),就可以愉快的操纵offset了。
在数据库中新建一张表Offset,表结构设计如图
//配置数据库信息//使用IDEA,在resources文件夹下新建文件File文件名为application.confdb.default.driver="com.mysql.jdbc.Driver"db.default.url="jdbc:mysql://hadoop01:3306/kafkaOffset?characterEncodeing=utf-8"db.default.user="root"db.default.password="root"
/*将偏移量保存到MySQL中 */object SparkStreamingOffsetMySql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("medd").setMaster("local[2]") val ssc = new StreamingContext(conf,Duration(5000)) //配置一系列基本配置 val groupid = "GPMMCC" val topic = "mysqlDemo" val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092" // val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" val topics = Set(topic) //设置kafka的参数 val kafkaParams = Map( "metadata.broker.list"->brokerList, "group.id"->groupid, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString ) //加载配置 application.conf https://www.jianshu.com/p/2369a020e604 DBs.setup() // connect to mysql //不需要查询zk中的offset啦,直接查询MySQL中的offset val fromdbOffset:Map[TopicAndPartition,Long]= DB.readOnly{ implicit session=>{ //查询每个分组下面的所有消息 SQL(s"select * from offset where groupId = '${groupid}'" + //将MySQL中的数据赋值给元组 s"").map(m=>(TopicAndPartition(m.string("topic"),m.string("partitions").toInt),m.string("untilOffset").toLong)) .toList().apply() }.toMap //最后toMap ,应为前面的返回值已经给定 } //创建一个DStream,用来获取数据 var kafkaDStream : InputDStream[(String,String)] = null //从MySql中获取数据进行判断 if(fromdbOffset.isEmpty){ kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder, StringDecoder](ssc,kafkaParams,topics) }else{ //1\ 不能重复消费 //2\ 保证偏移量 var checkOffset = Map[TopicAndPartition,Long]() //加载kafka的配置 val kafkaCluster = new KafkaCluster(kafkaParams) //首先获得kafka中的所有的topic和partition Offset val earliesOffset: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset] ] = kafkaCluster.getEarliestLeaderOffsets(fromdbOffset.keySet) //然后开始比较大小,用mysql中的offset和kafka中的offset进行比较 if(earliesOffset.isRight){ //去到需要的 大Map //物取值 val tap: Map[TopicAndPartition, KafkaCluster.LeaderOffset] = earliesOffset.right.get //比较,直接进行比较大小 val checkOffset = fromdbOffset.map(f => { //取kafka中的offset //进行比较,不需要重复消费,取最大的 val KafkatopicOffset = tap.get(f._1).get.offset if (f._2 > KafkatopicOffset) { f } else { (f._1, KafkatopicOffset) } }) checkOffset } val messageHandler=(mmd:MessageAndMetadata[String,String])=>{ (mmd.key(),mmd.message()) } //不是第一次启动的话 ,按照之前的偏移量取数据的偏移量 kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder ,StringDecoder,(String,String)](ssc,kafkaParams,checkOffset ,messageHandler) } var offsetRanges = Array[OffsetRange]() kafkaDStream.foreachRDD(kafkaRDD=>{ offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges val map: RDD[String] = kafkaRDD.map(_._2) map.foreach(println) //更新偏移量 DB.localTx(implicit session =>{ //去到所有的topic partition offset for (o<- offsetRanges){ /*SQL("update offset set groupId=? topic=?,partition=?," + "untilsOffset=?").bind(groupid,o.topic,o.partition,o.untilOffset).update().apply()*/ SQL("replace into offset(groupId,topic,partitions,untilOffset) values(?,?,?,?)").bind( groupid,o.topic,o.partition.toString,o.untilOffset.toString ).update().apply() } }) }) ssc.start() ssc.awaitTermination() }} 原文:https://blog.csdn.net/Lu_Xiao_Yue/article/details/84110045
/*kafka偏移量保存在数据库,spark从kafka拉去数据时候,先读取数据库偏移量*/object StreamingKafkaMysqlOffset { //设置日志级别 Logger.getLogger("org").setLevel(Level.WARN) def main(args: Array[String]): Unit = { //conf 本地运行设置 val conf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName(this.getClass.getSimpleName) //每秒钟每个分区kafka拉取消息的速率 .set("spark.streaming.kafka.maxRatePerPartition", "100") // 序列化 .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer") // 建议开启rdd的压缩 .set("spark.rdd.compress", "true") //SparkStreaming val ssc: StreamingContext = new StreamingContext(conf, Seconds(1)) // kafka的参数配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hadoop:9092,hadoop-01:9092,hadoop-02:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) //自己维护偏移量 ) val groupId = "topic_group0" val topic = "order" val topics = Array(topic) // 需要设置偏移量的值 val offsets: mutable.HashMap[TopicPartition, Long] = mutable.HashMap[TopicPartition, Long]() val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8", "root", "123456") val pstm = conn.prepareStatement("select * from mysqloffset where groupId = ? and topic = ? ") pstm.setString(1, groupId) pstm.setString(2, topic) val result: ResultSet = pstm.executeQuery() while (result.next()) { // 把数据库中的偏移量数据加载了 val p = result.getInt("partition") val f = result.getInt("untilOffset") // offsets += (new TopicPartition(topic,p)-> f) val partition: TopicPartition = new TopicPartition(topic, p) offsets.put(partition, f) } val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, Subscribe[String, String](topics, kafkaParams, offsets) ) //转换成RDD stream.foreachRDD(rdd => { //手动指定分区的地方 val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges println("长度=" + ranges.length) ranges.foreach(println) //: RDD[(String, Int)] val result = rdd.map(_.value()).flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _) result.foreach(println) // result.foreachPartition(p => { // val jedis: Jedis = ToolsRedisMysql.getJedis() // // val jedis = RedisUtils.getJedis // p.foreach(zookeeper => { // jedis.hincrBy("wc1", zookeeper._1, zookeeper._2) // }) // jedis.close() // }) // 把偏移量的Array 写入到mysql中 ranges.foreach(zookeeper => { // 思考,需要保存哪些数据呢? 起始的offset不需要 还需要加上 groupid val pstm = conn.prepareStatement("replace into mysqloffset values (?,?,?,?)") pstm.setString(1, zookeeper.topic) pstm.setInt(2, zookeeper.partition) pstm.setLong(3, zookeeper.untilOffset) pstm.setString(4, groupId) pstm.execute() pstm.close() }) }) ssc.start() ssc.awaitTermination() }}
import scala.collection.mutable /** 单个跟组情况 * 手工操作offset * 1 从hbase获取offset,从kafka拉取数据没有分组消费,所以没有分组信息 htable: hbase_consumer_offset Family: topic_partition_offset column: topic partition offset rowkey:topic_partition * 2 数据处理完后,把until offset 保存到hbase * 3 kafka 长时间挂掉之后,从kafka最早的offset 开始读取 此处还需要处理 */object OffsetOperate { var hbaseProp = PropertiesUtil.getProperties("hbase") var kafkaconsumePro = PropertiesUtil.getProperties("kafkaconsume") def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("sparkStreaming - offset operate") .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn .set("spark.testing.memory", "2147480000") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc,Seconds(5)) //kafka配置 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> kafkaconsumePro.getProperty("bootstrap.servers"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> kafkaconsumePro.getProperty("group"), "auto.offset.reset" -> "earliest", // 第一次读取时从topic 首位置开始读取 "enable.auto.commit" -> (false: java.lang.Boolean)// kafka 不保存消费的offset ) //监听频道 val topics = Array(kafkaconsumePro.getProperty("topics")) // 获取hbase连接 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum",hbaseProp.getProperty("quorum")) //zookeeper 集群 hbaseConf.set("hbase.zookeeper.property.client","2181") hbaseConf.set("hbase.master", hbaseProp.getProperty("hbase_master")) hbaseConf.set("hbase.defaults.for.version.skip", "true") //获取连接对象 val conn = ConnectionFactory.createConnection(hbaseConf) val admin = conn.getAdmin val tn = TableName.valueOf("hbase_consumer_offset") //hbase 表名 val isExist = admin.tableExists(tn) val streams : InputDStream[ConsumerRecord[String,String]]= { if(isExist) { val table = new HTable(hbaseConf, "hbase_consumer_offset") val filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(topics + "_"))) println("============ 过滤器已经创建 ==========") val s = new Scan() s.setFilter(filter) val rs = table.getScanner(s) // 设置 offset val fromOffsets = scala.collection.mutable.Map[TopicPartition, Long]() var s1 = "" var s2 = 0 var s3: Long = 0 for (r: Result <- rs.next(200)) { println("rowKey : " + new String(r.getRow)) for (keyvalue: KeyValue <- r.raw()) { if ("topic".equals(new String(keyvalue.getQualifier))) { s1 = new String(keyvalue.getValue) println("columnFamily :" + new String(keyvalue.getFamily) + " column :" +new String( keyvalue.getQualifier) + s1) } else if ("partition".equals(new String(keyvalue.getQualifier))){ s2 = Bytes.toInt(keyvalue.getValue) println("columnFamily :" + new String(keyvalue.getFamily) + " column :" + new String( keyvalue.getQualifier) + s2) } else if("offset".equals(new String(keyvalue.getQualifier))) { //if("offset".equals(new String(keyvalue.getQualifier))) s3 = Bytes.toLong(keyvalue.getValue) println("columnFamily :" + new String(keyvalue.getFamily) + " column :" + new String( keyvalue.getQualifier) + s3) } } fromOffsets.put(new TopicPartition(s1, s2), s3) } println("fromOffset is : "+fromOffsets) KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign(fromOffsets.keySet, kafkaParams, fromOffsets)) //(fromOffsets.keySet,kafkaParams,fromOffsets)) }else{ //Hbase 里面不存在offset表,从topic首位置开始消费 val htable = new HTableDescriptor(TableName.valueOf("hbase_consumer_offset")) htable.addFamily(new HColumnDescriptor(("topic_partition_offset"))) admin.createTable(htable) println("表已经创建成功========" + htable) KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topics, kafkaParams)) } } // val dstream = streams.map(x=>URLDecoder.decode(x.value())) // 操作成功后更新offset streams.foreachRDD{ rdd => //if(!rdd.isEmpty()){ // 打成一个事务,把业务计算和offset保存放在一起,要么成功,要么一起失败,实现精确一次的消费 import scala.collection.JavaConversions._ val table = new HTable(hbaseConf,"hbase_consumer_offset") table.setAutoFlush(false, false) var putList:List[Put] = List() val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // RDD[ConsumerRecord[String,String]] 强转成offsetRanges for(offsetRange <- offsetRanges){ println("the topic is "+offsetRange.topic) println("the partition is "+offsetRange.partition) println("the fromOffset is "+offsetRange.fromOffset) println("the untilOffset is "+offsetRange.untilOffset) println("the object is "+offsetRange) // val table = new HTable(hbaseConf,"hbase_consumer_offset") // table.setAutoFlush(false, false) val put = new Put(Bytes.toBytes(offsetRange.topic+"_"+offsetRange.partition))//put时候指定列族 put.add(Bytes.toBytes("topic_partition_offset"),Bytes.toBytes("topic"),Bytes.toBytes(offsetRange.topic)) put.add(Bytes.toBytes("topic_partition_offset"),Bytes.toBytes("partition"),Bytes.toBytes(offsetRange.partition)) put.add(Bytes.toBytes("topic_partition_offset"),Bytes.toBytes("offset"),Bytes.toBytes(offsetRange.untilOffset)) putList = put+:putList // println("add data success !") } println("the RDD records are "+rdd.map{x =>URLDecoder.decode(x.value())}.collect.foreach(println)) // 程序的计算逻辑 // } table.put(putList) table.flushCommits() println("add and compute data success !") } ssc.start() ssc.awaitTermination() }}
参考链接 :https://www.jianshu.com/p/667e0f58b7b9
实现的Spark Streaming代码如下(ConsumerRecord类不能序列化,使用时要注意,不要分发该类到其他工作节点上,避免错误打印)
2、多个服务器分区,多个组消费组,设计key: 主题_分组_分区; value :offset
gtKey=groupid/topic作为唯一标识
conn.hset(gtKey, partition.toString, offset.toString)
http://www.pianshen.com/article/8095259521/
object KafkaDricteRedis { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("redis").setMaster("local[*]") val ssc = new StreamingContext(conf, new Duration(5000)) val groupid = "GB01" //组名 val topic = "wordcount3" //topic 名 //在redis中以 groupid/topic作为唯一标识 ,存储分区偏移量 //在Reids 使用的时hash类型来存储 val gtKey = groupid + "/" + topic //topic val topics = Set(topic) //zk地址 val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //brokerList val brokerList = "hadoop01:9092,hadoop03:9092" val kafkaParams = Map( // metadata.broker.list "metadata.broker.list" -> brokerList, "group.id" -> groupid, "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString //从头开始消费 ) //记录topic 、分区对应的偏移量偏移量,在创建InputDStream时作为参数传如 //从这个偏移量开始读取 var fromOffset: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]() var offsets = Map[TopicPartition, Long]() var kafkaDStream: InputDStream[(String, String)] = null // 获取一个jedis连接 val conn = getConnection() // conn.flushDB() //jd.hget(groupid+topic,"") //获取全部的keys val values: util.Set[String] = conn.keys("*") //println(values) // [GB01/wordcount3] 分区数 偏移量 //如果keys中包含 GB01/wordcount3这样的key,则表示以前读取过 if (values.contains(gtKey)) { //获取key 为GB01/wordcount3 下面所对应的(k,v) var allKey: util.Map[String, String] = conn.hgetAll(gtKey) //导入后,可以把Java中的集合转换为Scala中的集合 import scala.collection.JavaConversions._ var list: List[(String, String)] = allKey.toList //循环得到的(k,v) //这里面的 k 对应的是分区, v对应的是偏移量 for (key <- list) { //这里的key是一个tuple类型 //new一个TopicAndPartition 把 topic 和分区数传入 val tp = new TopicAndPartition(topic, key._1.toInt) //把每个topic 分区 对应的偏移量传入 fromOffset += tp -> key._2.toLong // 把数据库中的偏移量数据加载了 val p = key._1.toInt val f = key._2.toLong// offsets += (new TopicPartition(topic,p)-> f) val partition: TopicPartition = new TopicPartition(topic, p) offsets.put(partition, f) } //这里的是把数据(key ,value)是kafka 的key默认是null, //value 是kafka中的value val messageHandler = (mmd: MessageAndMetadata[String, String]) => { (mmd.key(), mmd.message()) } val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, Subscribe[String, String](topics, kafkaParams, offsets) ) } else { //如果以前没有读取过,创建一个新的InputDStream val stream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)) } //用来更新偏移量,OffsetRange中可以获取分区及偏移量 var OffsetRangs = Array[OffsetRange]() // kafkaDStream.foreachRDD(kafkaRDD => { //这里面的RDD是kafkaRDD ,可以转换为HasOffsetRange val ranges: HasOffsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges] OffsetRangs = ranges.offsetRanges //获取value,(key 默认是null,没有用) val map: RDD[String] = kafkaRDD.map(_._2) map.foreach(x => println(x + "===========================")) //更新偏移量 for (o <- OffsetRangs) { //取出偏移量 val offset = o.untilOffset //取出分区 val partition = o.partition println("partition: " + partition) println("offset: " + offset) //把通过hset,把对应的partition和offset写入到redis中 conn.hset(gtKey, partition.toString, offset.toString) } }) ssc.start() ssc.awaitTermination() } //Jedis连接池 def getConnection(): Jedis = { //new 一个JedisPoolConfig,用来设定参数 val conf = new JedisPoolConfig() val pool = new JedisPool(conf, "hadoop01", 6379) //最大连接数 conf.setMaxTotal(20) //最大空闲数 conf.setMaxIdle(20) val jedis = pool.getResource() //密码 jedis.auth("123") jedis } }
原网址: 访问
创建于: 2020-05-13 05:58:51
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论