配置相关依赖包:
- mkdir -p $GOPATH/src/golang.org/x
- cd $GOPATH/src/golang.org/x
- git clone https://github.com/golang/net.git
- git clone https://github.com/golang/crypto.git
- go get github.com/Shopify/sarama
写入信息到 kafka
- package main
-
- import (
- "fmt"
- "github.com/Shopify/sarama"
- "time"
- )
-
- func main() {
- //初始化配置
- config := sarama.NewConfig()
- config.Producer.RequiredAcks = sarama.WaitForAll
- config.Producer.Partitioner = sarama.NewRandomPartitioner
- config.Producer.Return.Successes = true
- //生产者
- client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
- if err != nil {
- fmt.Println("producer close,err:", err)
- return
- }
-
- defer client.Close()
-
- var n int=0
- for n<20{
- n++
- //创建消息
- msg := &sarama.ProducerMessage{}
- msg.Topic = "topic_title"
- msg.Value = sarama.StringEncoder("this is a good test,hello gopher.cc!!")
- //发送消息
- pid, offset, err := client.SendMessage(msg)
- if err != nil {
- fmt.Println("send message failed,", err)
- return
- }
- fmt.Printf("pid:%v offset:%v\n,", pid, offset)
- time.Sleep(10 * time.Millisecond)
- }
- }
2 条评论
杰哥,猜猜我是谁~
猜不到哦