使用库:github.com/aws/aws-msk-iam-sasl-signer-python
安装方式参考:github.com/aws/aws-msk-iam-sasl-signer-python/blob/main/docs/installation.rst
最终代码:
#!/usr/bin/python3
from kafka import KafkaProducer
import socket
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('ap-southeast-1')
return token
tp = MSKTokenProvider()
producer = KafkaProducer(
bootstrap_servers='xxx.ap-southeast-1.amazonaws.com:9098,xxx.ap-southeast-1.amazonaws.com:9098',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
)
message = b'message body'
topic = "request"
producer.send(topic, message)
producer.flush()
print("Message produced!")
producer.close()