(3条消息)使用spark-streaming-kafka-0-10_2.11-2.0.0依赖包创建kafka输入流 - 学海无涯 - CSDN博客

object DirectStream {     def main(args: Array[String]): Unit = {       //创建SparkConf,如果将任务提交到集群中,那么要去掉.setMaster("local[2]")     val conf = new SparkConf().setAppName("DirectStream").setMaster("local[2]")     //创建一个StreamingContext,其里面包含了一个SparkContext     val streamingContext = new StreamingContext(conf, Seconds(5))       //配置kafka的参数     val kafkaParams = Map[String, Object](       "bootstrap.servers" -> "node-1.xiaoniu.com:9092,node-2.xiaoniu.com:9092,node-3.xiaoniu.com:9092",       "key.deserializer" -> classOf[StringDeserializer],       "value.deserializer" -> classOf[StringDeserializer],       "group.id" -> "test123",       "auto.offset.reset" -> "earliest", // lastest       "enable.auto.commit" -> (false: java.lang.Boolean)     )       val topics = Array("xiaoniu")     //在Kafka中记录读取偏移量     val stream = KafkaUtils.createDirectStream[String, String](       streamingContext,       //位置策略(可用的Executor上均匀分配分区)       LocationStrategies.PreferConsistent,       //消费策略(订阅固定的主题集合)       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)     )       //迭代DStream中的RDD(KafkaRDD),将每一个时间点对于的RDD拿出来     stream.foreachRDD { rdd =>       //获取该RDD对于的偏移量       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges       //拿出对应的数据       rdd.foreach{ line =>         println(line.key() + " " + line.value())       }       //异步更新偏移量到kafka中       // some time later, after outputs have completed       stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)     }     streamingContext.start()     streamingContext.awaitTermination()   } }

项目依赖

dependencies {    testCompile group: 'junit', name: 'junit', version: '4.12'    compile (group: 'org.apache.spark', name: 'spark-core_2.10', version:'2.1.0')    compile (group: 'org.apache.spark', name: 'spark-streaming_2.10', version:'2.1.0')    compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.10', version: '2.2.0'}

详见官网:

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#storing-offsets


Original url: Access
Created at: 2019-03-25 19:41:58
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论