spark Streaming +kafka 的offset数据保存MySQL、hbase、redis_大数据_曹雪朋 的博客-CSDN博客

Kafka做为一款流行的分布式发布订阅消息系统,以高吞吐、低延时、高可靠的特点著称,已经成为Spark Streaming常用的流数据来源。

其实说白了,官方提供的思路就是,把JavaInputDStream转换为OffsetRange对象,该对象具有topic对应的分区的所有信息,每次batch处理完,Spark Streaming都会自动更新该对象,所以你只需要找个合适的地方保存该对象(比如HBase、HDFS),就可以愉快的操纵offset了。

一、SparkStreaming直连方式读取kafka数据,使用MySQL保存偏移量

在数据库中新建一张表Offset,表结构设计如图
在这里插入图片描述

å¨è¿éæå¥å¾çæè¿°

//配置数据库信息//使用IDEA,在resources文件夹下新建文件File文件名为application.confdb.default.driver="com.mysql.jdbc.Driver"db.default.url="jdbc:mysql://hadoop01:3306/kafkaOffset?characterEncodeing=utf-8"db.default.user="root"db.default.password="root"
/*将偏移量保存到MySQL中 */object SparkStreamingOffsetMySql {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("medd").setMaster("local[2]")    val ssc = new StreamingContext(conf,Duration(5000))    //配置一系列基本配置    val groupid = "GPMMCC"    val topic = "mysqlDemo"    val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092"   // val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"    val topics = Set(topic)    //设置kafka的参数    val kafkaParams = Map(      "metadata.broker.list"->brokerList,      "group.id"->groupid,      "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString    )    //加载配置 application.conf  https://www.jianshu.com/p/2369a020e604    DBs.setup()     // connect to mysql     //不需要查询zk中的offset啦,直接查询MySQL中的offset    val fromdbOffset:Map[TopicAndPartition,Long]=      DB.readOnly{        implicit  session=>{          //查询每个分组下面的所有消息          SQL(s"select * from offset where groupId = '${groupid}'" +           //将MySQL中的数据赋值给元组            s"").map(m=>(TopicAndPartition(m.string("topic"),m.string("partitions").toInt),m.string("untilOffset").toLong))            .toList().apply()        }.toMap  //最后toMap ,应为前面的返回值已经给定      }     //创建一个DStream,用来获取数据    var kafkaDStream : InputDStream[(String,String)] = null     //从MySql中获取数据进行判断    if(fromdbOffset.isEmpty){      kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder,        StringDecoder](ssc,kafkaParams,topics)    }else{      //1\ 不能重复消费      //2\ 保证偏移量      var checkOffset = Map[TopicAndPartition,Long]()       //加载kafka的配置      val kafkaCluster = new KafkaCluster(kafkaParams)      //首先获得kafka中的所有的topic和partition Offset      val earliesOffset: Either[Err, Map[TopicAndPartition, KafkaCluster.LeaderOffset]        ] = kafkaCluster.getEarliestLeaderOffsets(fromdbOffset.keySet)       //然后开始比较大小,用mysql中的offset和kafka中的offset进行比较      if(earliesOffset.isRight){        //去到需要的 大Map        //物取值        val tap: Map[TopicAndPartition, KafkaCluster.LeaderOffset] =        earliesOffset.right.get        //比较,直接进行比较大小        val checkOffset = fromdbOffset.map(f => {          //取kafka中的offset          //进行比较,不需要重复消费,取最大的          val KafkatopicOffset = tap.get(f._1).get.offset          if (f._2 > KafkatopicOffset) {            f          } else {            (f._1, KafkatopicOffset)          }        })        checkOffset      }      val messageHandler=(mmd:MessageAndMetadata[String,String])=>{        (mmd.key(),mmd.message())      }      //不是第一次启动的话 ,按照之前的偏移量取数据的偏移量      kafkaDStream = KafkaUtils.createDirectStream[String,String,StringDecoder        ,StringDecoder,(String,String)](ssc,kafkaParams,checkOffset      ,messageHandler)    }    var offsetRanges = Array[OffsetRange]()    kafkaDStream.foreachRDD(kafkaRDD=>{     offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges      val map: RDD[String] = kafkaRDD.map(_._2)      map.foreach(println)       //更新偏移量        DB.localTx(implicit session =>{          //去到所有的topic partition offset          for (o<- offsetRanges){            /*SQL("update offset set groupId=? topic=?,partition=?," +              "untilsOffset=?").bind(groupid,o.topic,o.partition,o.untilOffset).update().apply()*/            SQL("replace into offset(groupId,topic,partitions,untilOffset) values(?,?,?,?)").bind(              groupid,o.topic,o.partition.toString,o.untilOffset.toString            ).update().apply()          }        })    })    ssc.start()    ssc.awaitTermination()  }} 原文:https://blog.csdn.net/Lu_Xiao_Yue/article/details/84110045 
/*kafka偏移量保存在数据库,spark从kafka拉去数据时候,先读取数据库偏移量*/object StreamingKafkaMysqlOffset {  //设置日志级别  Logger.getLogger("org").setLevel(Level.WARN)   def main(args: Array[String]): Unit = {    //conf 本地运行设置    val conf: SparkConf = new SparkConf()      .setMaster("local[*]")      .setAppName(this.getClass.getSimpleName)      //每秒钟每个分区kafka拉取消息的速率      .set("spark.streaming.kafka.maxRatePerPartition", "100")      // 序列化      .set("spark.serilizer", "org.apache.spark.serializer.KryoSerializer")      // 建议开启rdd的压缩      .set("spark.rdd.compress", "true")     //SparkStreaming    val ssc: StreamingContext = new StreamingContext(conf, Seconds(1))     // kafka的参数配置    val kafkaParams = Map[String, Object](      "bootstrap.servers" -> "hadoop:9092,hadoop-01:9092,hadoop-02:9092",      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> groupId,      "auto.offset.reset" -> "earliest",      "enable.auto.commit" -> (false: java.lang.Boolean) //自己维护偏移量    )    val groupId = "topic_group0"    val topic = "order"    val topics = Array(topic)    // 需要设置偏移量的值    val offsets: mutable.HashMap[TopicPartition, Long] = mutable.HashMap[TopicPartition, Long]()    val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=utf-8", "root", "123456")     val pstm = conn.prepareStatement("select * from mysqloffset where groupId = ? and topic = ? ")    pstm.setString(1, groupId)    pstm.setString(2, topic)     val result: ResultSet = pstm.executeQuery()    while (result.next()) {      // 把数据库中的偏移量数据加载了      val p = result.getInt("partition")      val f = result.getInt("untilOffset")      //      offsets += (new TopicPartition(topic,p)-> f)      val partition: TopicPartition = new TopicPartition(topic, p)      offsets.put(partition, f)    }     val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](      ssc,      LocationStrategies.PreferConsistent,      Subscribe[String, String](topics, kafkaParams, offsets)    )     //转换成RDD    stream.foreachRDD(rdd => {      //手动指定分区的地方      val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges      println("长度=" + ranges.length)      ranges.foreach(println)      //: RDD[(String, Int)]      val result = rdd.map(_.value()).flatMap(_.split(",")).map((_, 1)).reduceByKey(_ + _)      result.foreach(println)       //      result.foreachPartition(p => {      //        val jedis: Jedis = ToolsRedisMysql.getJedis()      //        //        val jedis = RedisUtils.getJedis      //        p.foreach(zookeeper => {      //          jedis.hincrBy("wc1", zookeeper._1, zookeeper._2)      //        })      //        jedis.close()      //      })       // 把偏移量的Array  写入到mysql中      ranges.foreach(zookeeper => {        // 思考,需要保存哪些数据呢?   起始的offset不需要  还需要加上 groupid         val pstm = conn.prepareStatement("replace into mysqloffset values (?,?,?,?)")        pstm.setString(1, zookeeper.topic)        pstm.setInt(2, zookeeper.partition)        pstm.setLong(3, zookeeper.untilOffset)        pstm.setString(4, groupId)        pstm.execute()        pstm.close()      })    })    ssc.start()    ssc.awaitTermination()   }}

二、offset 保存到hbase

 import scala.collection.mutable /**  单个跟组情况  * 手工操作offset  *        1 从hbase获取offset,从kafka拉取数据没有分组消费,所以没有分组信息    htable: hbase_consumer_offset    Family: topic_partition_offset    column: topic             partition            offset   rowkey:topic_partition  *        2 数据处理完后,把until offset 保存到hbase  *        3 kafka 长时间挂掉之后,从kafka最早的offset 开始读取 此处还需要处理     */object OffsetOperate {  var hbaseProp = PropertiesUtil.getProperties("hbase")  var kafkaconsumePro = PropertiesUtil.getProperties("kafkaconsume")  def main(args: Array[String]): Unit = {   val conf = new SparkConf().setAppName("sparkStreaming - offset operate")    .setMaster("local[2]") // --master local[2] | spark://xx:7077 | yarn    .set("spark.testing.memory", "2147480000")    val sc = new SparkContext(conf)    val ssc = new StreamingContext(sc,Seconds(5))     //kafka配置    val kafkaParams = Map[String, Object](      "bootstrap.servers" -> kafkaconsumePro.getProperty("bootstrap.servers"),      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> kafkaconsumePro.getProperty("group"),      "auto.offset.reset" -> "earliest", // 第一次读取时从topic 首位置开始读取      "enable.auto.commit" -> (false: java.lang.Boolean)// kafka 不保存消费的offset    )     //监听频道    val topics = Array(kafkaconsumePro.getProperty("topics"))    // 获取hbase连接    val hbaseConf = HBaseConfiguration.create()    hbaseConf.set("hbase.zookeeper.quorum",hbaseProp.getProperty("quorum")) //zookeeper 集群    hbaseConf.set("hbase.zookeeper.property.client","2181")    hbaseConf.set("hbase.master", hbaseProp.getProperty("hbase_master"))    hbaseConf.set("hbase.defaults.for.version.skip", "true")    //获取连接对象    val conn = ConnectionFactory.createConnection(hbaseConf)    val admin = conn.getAdmin    val tn = TableName.valueOf("hbase_consumer_offset") //hbase 表名    val isExist = admin.tableExists(tn)    val streams : InputDStream[ConsumerRecord[String,String]]= {    if(isExist) {      val table = new HTable(hbaseConf, "hbase_consumer_offset")      val filter = new RowFilter(CompareOp.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes(topics + "_")))      println("============ 过滤器已经创建 ==========")      val s = new Scan()      s.setFilter(filter)      val rs = table.getScanner(s)       // 设置 offset      val fromOffsets = scala.collection.mutable.Map[TopicPartition, Long]()      var s1 = ""      var s2 = 0      var s3: Long = 0        for (r: Result <- rs.next(200)) {          println("rowKey : " + new String(r.getRow))          for (keyvalue: KeyValue <- r.raw()) {            if ("topic".equals(new String(keyvalue.getQualifier))) {              s1 = new String(keyvalue.getValue)              println("columnFamily :" + new String(keyvalue.getFamily) + " column :" +new String( keyvalue.getQualifier) + s1)            } else if ("partition".equals(new String(keyvalue.getQualifier))){              s2 = Bytes.toInt(keyvalue.getValue)              println("columnFamily :" +  new String(keyvalue.getFamily) + " column :" + new String( keyvalue.getQualifier) + s2)            } else if("offset".equals(new String(keyvalue.getQualifier))) { //if("offset".equals(new String(keyvalue.getQualifier)))              s3 = Bytes.toLong(keyvalue.getValue)              println("columnFamily :" + new String(keyvalue.getFamily) + " column :" + new String( keyvalue.getQualifier) + s3)            }          }          fromOffsets.put(new TopicPartition(s1, s2), s3)        }      println("fromOffset is : "+fromOffsets)        KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent,          ConsumerStrategies.Assign(fromOffsets.keySet, kafkaParams, fromOffsets)) //(fromOffsets.keySet,kafkaParams,fromOffsets))      }else{ //Hbase 里面不存在offset表,从topic首位置开始消费        val htable = new HTableDescriptor(TableName.valueOf("hbase_consumer_offset"))        htable.addFamily(new HColumnDescriptor(("topic_partition_offset")))        admin.createTable(htable)        println("表已经创建成功========" + htable)      KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe(topics, kafkaParams))      }    }  // val dstream = streams.map(x=>URLDecoder.decode(x.value()))     // 操作成功后更新offset    streams.foreachRDD{ rdd =>      //if(!rdd.isEmpty()){      // 打成一个事务,把业务计算和offset保存放在一起,要么成功,要么一起失败,实现精确一次的消费      import scala.collection.JavaConversions._      val table = new HTable(hbaseConf,"hbase_consumer_offset")      table.setAutoFlush(false, false)      var putList:List[Put] = List()        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  // RDD[ConsumerRecord[String,String]] 强转成offsetRanges        for(offsetRange <- offsetRanges){          println("the topic is "+offsetRange.topic)          println("the partition is "+offsetRange.partition)          println("the fromOffset is "+offsetRange.fromOffset)          println("the untilOffset is "+offsetRange.untilOffset)          println("the object is "+offsetRange)         // val table = new HTable(hbaseConf,"hbase_consumer_offset")         // table.setAutoFlush(false, false)          val put  = new Put(Bytes.toBytes(offsetRange.topic+"_"+offsetRange.partition))//put时候指定列族          put.add(Bytes.toBytes("topic_partition_offset"),Bytes.toBytes("topic"),Bytes.toBytes(offsetRange.topic))          put.add(Bytes.toBytes("topic_partition_offset"),Bytes.toBytes("partition"),Bytes.toBytes(offsetRange.partition))          put.add(Bytes.toBytes("topic_partition_offset"),Bytes.toBytes("offset"),Bytes.toBytes(offsetRange.untilOffset))          putList = put+:putList         // println("add data success !")        }         println("the RDD records are "+rdd.map{x =>URLDecoder.decode(x.value())}.collect.foreach(println)) // 程序的计算逻辑      //  }      table.put(putList)      table.flushCommits()      println("add and compute data success !")      }    ssc.start()    ssc.awaitTermination()  }}

参考链接 :https://www.jianshu.com/p/667e0f58b7b9
 
实现的Spark Streaming代码如下(ConsumerRecord类不能序列化,使用时要注意,不要分发该类到其他工作节点上,避免错误打印)

三、存储在redis(基于内存)读写更快,

2、多个服务器分区,多个组消费组,设计key: 主题_分组_分区;   value :offset

gtKey=groupid/topic作为唯一标识   

conn.hset(gtKey, partition.toString, offset.toString)

http://www.pianshen.com/article/8095259521/

 object KafkaDricteRedis {   def main(args: Array[String]): Unit = {    val conf = new SparkConf().setAppName("redis").setMaster("local[*]")    val ssc = new StreamingContext(conf, new Duration(5000))     val groupid = "GB01" //组名    val topic = "wordcount3"    //topic 名    //在redis中以 groupid/topic作为唯一标识 ,存储分区偏移量    //在Reids 使用的时hash类型来存储    val gtKey = groupid + "/" + topic    //topic    val topics = Set(topic)    //zk地址    val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181"    //brokerList    val brokerList = "hadoop01:9092,hadoop03:9092"     val kafkaParams = Map(      // metadata.broker.list      "metadata.broker.list" -> brokerList,      "group.id" -> groupid,      "auto.offset.reset" -> kafka.api.OffsetRequest.SmallestTimeString      //从头开始消费    )    //记录topic 、分区对应的偏移量偏移量,在创建InputDStream时作为参数传如    //从这个偏移量开始读取    var fromOffset: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()    var offsets =   Map[TopicPartition, Long]()     var kafkaDStream: InputDStream[(String, String)] = null    //    获取一个jedis连接    val conn = getConnection()    // conn.flushDB()    //jd.hget(groupid+topic,"")    //获取全部的keys    val values: util.Set[String] = conn.keys("*")    //println(values)    // [GB01/wordcount3]   分区数   偏移量    //如果keys中包含 GB01/wordcount3这样的key,则表示以前读取过    if (values.contains(gtKey)) {      //获取key 为GB01/wordcount3 下面所对应的(k,v)      var allKey: util.Map[String, String] = conn.hgetAll(gtKey)      //导入后,可以把Java中的集合转换为Scala中的集合      import scala.collection.JavaConversions._      var list: List[(String, String)] = allKey.toList      //循环得到的(k,v)      //这里面的 k 对应的是分区, v对应的是偏移量      for (key <- list) { //这里的key是一个tuple类型        //new一个TopicAndPartition 把 topic 和分区数传入        val tp = new TopicAndPartition(topic, key._1.toInt)        //把每个topic 分区 对应的偏移量传入        fromOffset += tp -> key._2.toLong         // 把数据库中的偏移量数据加载了        val p = key._1.toInt        val f =  key._2.toLong//        offsets += (new TopicPartition(topic,p)-> f)        val partition: TopicPartition = new TopicPartition(topic, p)        offsets.put(partition, f)        }      //这里的是把数据(key ,value)是kafka 的key默认是null,      //value 是kafka中的value      val messageHandler = (mmd: MessageAndMetadata[String, String]) => {        (mmd.key(), mmd.message())      }       val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](        ssc,        LocationStrategies.PreferConsistent,        Subscribe[String, String](topics, kafkaParams, offsets)      )    } else {      //如果以前没有读取过,创建一个新的InputDStream      val stream = KafkaUtils.createDirectStream[String, String](        ssc,        PreferConsistent,        Subscribe[String, String](topics, kafkaParams))    }    //用来更新偏移量,OffsetRange中可以获取分区及偏移量    var OffsetRangs = Array[OffsetRange]()    //    kafkaDStream.foreachRDD(kafkaRDD => {      //这里面的RDD是kafkaRDD ,可以转换为HasOffsetRange      val ranges: HasOffsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges]      OffsetRangs = ranges.offsetRanges      //获取value,(key 默认是null,没有用)      val map: RDD[String] = kafkaRDD.map(_._2)      map.foreach(x => println(x + "==========================="))      //更新偏移量      for (o <- OffsetRangs) {        //取出偏移量        val offset = o.untilOffset        //取出分区        val partition = o.partition        println("partition: " + partition)        println("offset: " + offset)        //把通过hset,把对应的partition和offset写入到redis中        conn.hset(gtKey, partition.toString, offset.toString)      }    })    ssc.start()    ssc.awaitTermination()  }  //Jedis连接池  def getConnection(): Jedis = {    //new 一个JedisPoolConfig,用来设定参数    val conf = new JedisPoolConfig()    val pool = new JedisPool(conf, "hadoop01", 6379)    //最大连接数    conf.setMaxTotal(20)    //最大空闲数    conf.setMaxIdle(20)     val jedis = pool.getResource()    //密码    jedis.auth("123")    jedis  } }

原网址: 访问
创建于: 2020-05-13 05:58:51
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论