Golang 连接火山云 datasail kafka 生产消息

Jackey Golang 379 次浏览 , 没有评论
package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

// 发送信息到队列
func Send(content, topic string) {
    // 构造生产配置
    configMap := &kafka.ConfigMap{
        "bootstrap.servers": "mq-proxy.datasail.volces.com:9093",
        "security.protocol": "SASL_PLAINTEXT",
        "acks":              "1",
        "batch.size":        "16384",
    }

    // 配置SASL认证
    configMap.SetKey("sasl.mechanism", "PLAIN")
    configMap.SetKey("sasl.username", "123456")
    configMap.SetKey("sasl.password", "456789")

    // 创建一个Kafka生产者对象
    producer, err := kafka.NewProducer(configMap)
    if err != nil {
        fmt.Println("NewProducer", err)
        return
    }

    // 注意:这里没有包含生产者和消费者的代码示例,只是展示了连接的创建
    // 关闭客户端连接
    defer producer.Close()
    msg := &kafka.Message{
        // 消息写入位置
        TopicPartition: kafka.TopicPartition{
            // 消息需要写入的Topic名称
            Topic: &topic,
            // 消息写入的分区编号,可以指定Topic特定的某一分区写入,或者设置为kafka.PartitionAny由系统自行选择
            Partition: kafka.PartitionAny,
        },
        Value: []byte(content),
        // 消息的属性值,作为额外的扩展属性,可以为nil
        Headers: []kafka.Header{
            {Key: "service", Value: []byte("kafka")},
            {Key: "version", Value: []byte("2.2.0")},
        },
    }

    err = producer.Produce(msg, producer.Events())
    if err != nil {
        fmt.Println("Send-mq-err", err)
        return
    }

    producer.Flush(10000)
    return
}

func main() {
    str := `This is test 222222222!!!!!!`
    Send(str, "topicName")
}

 

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

Go