package main
import (
"fmt"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// 发送信息到队列
func Send(content, topic string) {
// 构造生产配置
configMap := &kafka.ConfigMap{
"bootstrap.servers": "mq-proxy.datasail.volces.com:9093",
"security.protocol": "SASL_PLAINTEXT",
"acks": "1",
"batch.size": "16384",
}
// 配置SASL认证
configMap.SetKey("sasl.mechanism", "PLAIN")
configMap.SetKey("sasl.username", "123456")
configMap.SetKey("sasl.password", "456789")
// 创建一个Kafka生产者对象
producer, err := kafka.NewProducer(configMap)
if err != nil {
fmt.Println("NewProducer", err)
return
}
// 注意:这里没有包含生产者和消费者的代码示例,只是展示了连接的创建
// 关闭客户端连接
defer producer.Close()
msg := &kafka.Message{
// 消息写入位置
TopicPartition: kafka.TopicPartition{
// 消息需要写入的Topic名称
Topic: &topic,
// 消息写入的分区编号,可以指定Topic特定的某一分区写入,或者设置为kafka.PartitionAny由系统自行选择
Partition: kafka.PartitionAny,
},
Value: []byte(content),
// 消息的属性值,作为额外的扩展属性,可以为nil
Headers: []kafka.Header{
{Key: "service", Value: []byte("kafka")},
{Key: "version", Value: []byte("2.2.0")},
},
}
err = producer.Produce(msg, producer.Events())
if err != nil {
fmt.Println("Send-mq-err", err)
return
}
producer.Flush(10000)
return
}
func main() {
str := `This is test 222222222!!!!!!`
Send(str, "topicName")
}