kafka是消息中间件的一种,是一种分布式流平台,是用于构建实时数据管道和流应用程序。具有横向扩展,容错,wicked fast(变态快)等优点。
kafka功能
kafka中的消息模型
生产者(producer)将消息记录(record)发送到kafka中的主题中(topic), 一个主题可以有多个分区(partition), 消息最终存储在分区中,消费者(consumer)最终从主题的分区中获取消息。
一: Mac版安装
brew install kafka
安装kafka是需要依赖于zookeeper的,所以安装kafka的时候也会包含zookeeper
kafka的安装目录:/usr/local/Cellar/kafka
kafka的配置文件目录:/usr/local/etc/kafka
kafka服务的配置文件:/usr/local/etc/kafka/server.properties
zookeeper配置文件: /usr/local/etc/kafka/zookeeper.properties
# server.properties中的重要配置 broker.id=0listeners=PLAINTEXT://:9092advertised.listeners=PLAINTEXT://127.0.0.1:9092log.dirs=/usr/local/var/lib/kafka-logs
# zookeeper.properties dataDir=/usr/local/var/lib/zookeeperclientPort=2181maxClientCnxns=0
二: 启动zookeeper
# 新起一个终端启动zookeepercd /usr/local/Cellar/kafka/1.0.0./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
三: 启动kafka
# 新起一个终端启动zookeeper,注意启动kafka之前先启动zookeepercd /usr/local/Cellar/kafka/1.0.0./bin/kafka-server-start /usr/local/etc/kafka/server.properties
四:创建topic
# 新起一个终端来创建主题cd /usr/local/Cellar/kafka/1.0.0 ## 创建一个名为“test”的主题,该主题有1个分区./bin/kafka-topics --create --zookeeper localhost:2181 --partitions 1 --topic test
五:查看topic
// 创建成功可以通过 list 列举所有的主题./bin/kafka-topics --list --zookeeper localhost:2181 // 查看某个主题的信息./bin/kafka-topics --describe --zookeeper localhost:2181 --topic <name>
六:发送消息
# 新起一个终端,作为生产者,用于发送消息,每一行算一条消息,将消息发送到kafka服务器 > ./bin/kafka-console-producer --broker-list localhost:9092 --topic test This is a message This is another message
七:消费消息(接收消息)
# 新起一个终端作为消费者,接收消息cd /usr/local/Cellar/kafka/1.0.0> ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginningThis is a messageThis is another message
八:在生产者发送消息
在步骤六中新起的终端属于一条消息(任意字符),输入完回车就算一条消息,可以看到在步骤7中的消费者端就会显示刚才输入的消息
1. 同步消息模式
import ( "github.com/Shopify/sarama" "time" "log" "fmt" "os" "os/signal" "sync") var Address = []string{"10.130.138.164:9092","10.130.138.164:9093","10.130.138.164:9094"} func main() { syncProducer(Address) //asyncProducer1(Address)} //同步消息模式func syncProducer(address []string) { config := sarama.NewConfig() config.Producer.Return.Successes = true config.Producer.Timeout = 5 * time.Second p, err := sarama.NewSyncProducer(address, config) if err != nil { log.Printf("sarama.NewSyncProducer err, message=%s \n", err) return } defer p.Close() topic := "test" srcValue := "sync: this is a message. index=%d" for i:=0; i<10; i++ { value := fmt.Sprintf(srcValue, i) msg := &sarama.ProducerMessage{ Topic:topic, Value:sarama.ByteEncoder(value), } part, offset, err := p.SendMessage(msg) if err != nil { log.Printf("send message(%s) err=%s \n", value, err) }else { fmt.Fprintf(os.Stdout, value + "发送成功,partition=%d, offset=%d \n", part, offset) } time.Sleep(2*time.Second) }}
2.异步消息
func SaramaProducer() { config := sarama.NewConfig() //等待服务器所有副本都保存成功后的响应 config.Producer.RequiredAcks = sarama.WaitForAll //随机向partition发送消息 config.Producer.Partitioner = sarama.NewRandomPartitioner //是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用. config.Producer.Return.Successes = true config.Producer.Return.Errors = true //设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置 //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息 config.Version = sarama.V0_10_0_1 fmt.Println("start make producer") //使用配置,新建一个异步生产者 producer, e := sarama.NewAsyncProducer([]string{"182.61.9.153:6667","182.61.9.154:6667","182.61.9.155:6667"}, config) if e != nil { fmt.Println(e) return } defer producer.AsyncClose() //循环判断哪个通道发送过来数据. fmt.Println("start goroutine") go func(p sarama.AsyncProducer) { for{ select { case <-p.Successes(): //fmt.Println("offset: ", suc.Offset, "timestamp: ", suc.Timestamp.String(), "partitions: ", suc.Partition) case fail := <-p.Errors(): fmt.Println("err: ", fail.Err) } } }(producer) var value string for i:=0;;i++ { time.Sleep(500*time.Millisecond) time11:=time.Now() value = "this is a message 0606 "+time11.Format("15:04:05") // 发送的消息,主题。 // 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。 msg := &sarama.ProducerMessage{ Topic: "0606_test", } //将字符串转化为字节数组 msg.Value = sarama.ByteEncoder(value) //fmt.Println(value) //使用通道发送 producer.Input() <- msg }}
集群模实现
func main() { topic := []string{"test"} var wg = &sync.WaitGroup{} wg.Add(2) //广播式消费:消费者1 go clusterConsumer(wg, Address, topic, "group-1") //广播式消费:消费者2 go clusterConsumer(wg, Address, topic, "group-2") wg.Wait()} // 支持brokers cluster的消费者func clusterConsumer(wg *sync.WaitGroup,brokers, topics []string, groupId string) { defer wg.Done() config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true config.Consumer.Offsets.Initial = sarama.OffsetNewest // init consumer consumer, err := cluster.NewConsumer(brokers, groupId, topics, config) if err != nil { log.Printf("%s: sarama.NewSyncProducer err, message=%s \n", groupId, err) return } defer consumer.Close() // trap SIGINT to trigger a shutdown signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // consume errors go func() { for err := range consumer.Errors() { log.Printf("%s:Error: %s\n", groupId, err.Error()) } }() // consume notifications go func() { for ntf := range consumer.Notifications() { log.Printf("%s:Rebalanced: %+v \n", groupId, ntf) } }() // consume messages, watch signals var successes int Loop: for { select { case msg, ok := <-consumer.Messages(): if ok { fmt.Fprintf(os.Stdout, "%s:%s/%d/%d\t%s\t%s\n", groupId, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value) consumer.MarkOffset(msg, "") // mark message as processed successes++ } case <-signals: break Loop } } fmt.Fprintf(os.Stdout, "%s consume %d messages \n", groupId, successes)}
Original url: Access
Created at: 2019-05-22 18:08:04
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论