package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) // 发送信息到队列 func Send(content, topic string) { // 构造生产配置 configMap := &kafka.ConfigMap{ "bootstrap.servers": "mq-proxy.datasail.volces.com:9093", "security.protocol": "SASL_PLAINTEXT", "acks": "1", "batch.size": "16384", } // 配置SASL认证 configMap.SetKey("sasl.mechanism", "PLAIN") configMap.SetKey("sasl.username", "123456") configMap.SetKey("sasl.password", "456789") // 创建一个Kafka生产者对象 producer, err := kafka.NewProducer(configMap) if err != nil { fmt.Println("NewProducer", err) return } // 注意:这里没有包含生产者和消费者的代码示例,只是展示了连接的创建 // 关闭客户端连接 defer producer.Close() msg := &kafka.Message{ // 消息写入位置 TopicPartition: kafka.TopicPartition{ // 消息需要写入的Topic名称 Topic: &topic, // 消息写入的分区编号,可以指定Topic特定的某一分区写入,或者设置为kafka.PartitionAny由系统自行选择 Partition: kafka.PartitionAny, }, Value: []byte(content), // 消息的属性值,作为额外的扩展属性,可以为nil Headers: []kafka.Header{ {Key: "service", Value: []byte("kafka")}, {Key: "version", Value: []byte("2.2.0")}, }, } err = producer.Produce(msg, producer.Events()) if err != nil { fmt.Println("Send-mq-err", err) return } producer.Flush(10000) return } func main() { str := `This is test 222222222!!!!!!` Send(str, "topicName") }