验证方式 | Kafka版本 | 特点 |
SASL/PLAIN | | 不能动态增加用户 |
SASL/SCRAM | | 可以动态增加用户 |
SASL/Kerberos | | 需要独立部署验证服务 |
SASL/OAUTHBEARER | 2.0.0 | 需自己实现接口实现token的创建和验证,需要额外Oauth服务 |
2)SSL加密: 使用SSL加密在代理和客户端之间,代理之间或代理和工具之间传输的数据。
vi config/server.properties
#配置zookeeper管理kafka的路径 zookeeper.connect=localhost:2181 #配置kafka的监听端口(SASL_PLAINTEXT:动态增加用户协议,PLAINTEXT 不能动态增加用户) listeners=SASL_PLAINTEXT://:9092 #把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP(SASL_PLAINTEXT:动态增加用户协议,PLAINTEXT 不能动态增加用户) advertised.listeners=SASL_PLAINTEXT://你外网的ip:9092 #使用的认证协议(SASL_PLAINTEXT:动态增加用户协议,PLAINTEXT 不能动态增加用户) security.inter.broker.protocol=SASL_PLAINTEXT #SASL机制 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN #完成身份验证的类 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #如果没有找到ACL(访问控制列表)配置,则允许任何操作。 allow.everyone.if.no.acl.found=false #需要开启设置超级管理员,设置visitor用户为超级管理员 super.users=User:visitor
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL num.partitions=1 # The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168 #log.retention.bytes=1073741824 When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. ",,". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=18000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 #配置zookeeper管理kafka的路径 zookeeper.connect=localhost:2181 #配置kafka的监听端口(SASL_PLAINTEXT:动态增加用户协议,PLAINTEXT 不能动态增加用户) listeners=SASL_PLAINTEXT://:9092 #把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP(SASL_PLAINTEXT:动态增加用户协议,PLAINTEXT 不能动态增加用户) advertised.listeners=SASL_PLAINTEXT://你外网的ip:9092 #使用的认证协议(SASL_PLAINTEXT:动态增加用户协议,PLAINTEXT 不能动态增加用户) security.inter.broker.protocol=SASL_PLAINTEXT #SASL机制 sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN #完成身份验证的类 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer #如果没有找到ACL(访问控制列表)配置,则允许任何操作。 allow.everyone.if.no.acl.found=false #需要开启设置超级管理员,设置visitor用户为超级管理员 super.users=User:visitor
mkdir pass
创建 kafka_server_jaas.conf 文件
vi kafka_server_jaas.conf
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="visitor" password="qaz@123" user_visitor="qaz@123"; };
vi bin/kafka-server-start.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka2.12/pass/kafka_server_jaas.conf"
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="visitor" password="qaz@123"; };
vi bin/kafka-console-producer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka2.12/pass/kafka_client_jaas.conf"
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
vi bin/kafka-console-consumer.sh
export KAFKA_OPTS=" -Djava.security.auth.login.config=/usr/local/kafka2.12/pass/kafka_client_jaas.conf"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testTopic --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN
# # 环境选择器 # # @author:tongyao # spring: kafka: bootstrap-servers: # 配置用户名密码 properties: security.protocol: SASL_PLAINTEXT sasl.mechanism: PLAIN # SASL/PLAIN 不能动态增加用户 #sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="visitor" password="qaz@123"; # SASL/SCRAM 可以动态增加用户 sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="visitor" password="qaz@123"; producer: # 发生错误后,消息重发的次数。 retries: 0 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 5 #listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false server: port: 8899