使用库:github.com/aws/aws-msk-iam-sasl-signer-python
安装方式参考:github.com/aws/aws-msk-iam-sasl-signer-python/blob/main/docs/installation.rst
最终代码:
#!/usr/bin/python3 from kafka import KafkaProducer import socket from aws_msk_iam_sasl_signer import MSKAuthTokenProvider class MSKTokenProvider(): def token(self): token, _ = MSKAuthTokenProvider.generate_auth_token('ap-southeast-1') return token tp = MSKTokenProvider() producer = KafkaProducer( bootstrap_servers='xxx.ap-southeast-1.amazonaws.com:9098,xxx.ap-southeast-1.amazonaws.com:9098', security_protocol='SASL_SSL', sasl_mechanism='OAUTHBEARER', sasl_oauth_token_provider=tp, client_id=socket.gethostname(), ) message = b'message body' topic = "request" producer.send(topic, message) producer.flush() print("Message produced!") producer.close()