为了kafka集群的安全,我们一般会开启认证,kafka的认证方式有多种,详细见http://kafka.apache.org/documentation/#security, 这里只讲下sasl认证方式
在zookeeper安装根目录的conf目录下,创建zk_server_jaas.conf, 文件内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_kafka="kafka-secret";
};
user_kafka="kafka-secret"; 创建用户kafka, 密码是"kafka-secret", 该账号用于kafka broker与zookeeper连接的时候的认证
修改zoo.cfg, 添加一下内容
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是kafka-client.jar里面,所有需要将相应的jar拷贝到 zookeeper安装根目录的lib目录下, 大概要copy这些jar
kafka-clients-2.0.0.jar
lz4-java-1.4.1.jar
osgi-resource-locator-1.0.1.jar
slf4j-api-1.7.25.jar
snappy-java-1.1.7.1.jar
然后就是修改zk的启动参数, 修改 bin/zkEnv.sh, 在文件尾加上
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
在安装根目录的config目录下, 创建kafka_server_jaas.conf, 内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-secret";
};
KafkaServer段里面配置了broker之间的认证配置以及client和broker之间的认证配置
KafkaServer.username, KafkaServer.password用于broker之间的相互认证
KafkaServer.user_admin和KafkaServer.user_alice用于client和broker之间的认证, 下面我们client里面都用用户alice进行认证
Client段里面定义username和password用于broker与zookeeper连接的认证
修改 config/server.properties
# 本例使用SASL_PLAINTEXT
listeners=SASL_PLAINTEXT://127.0.0.1:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# default false | true to accept all the users to use it.
# allow.everyone.if.no.acl.found=true
# 设置本例中admin为超级用户
super.users=User:admin
修改kafka启动脚本, 添加 java.security.auth.login.config 环境变量。打开 bin/kafka-server-start.sh
将最后一句
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
修改为
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_server_jaas.conf kafka.Kafka "$@"
在kakfka根目录/config/下创建kafka_client_jaas.conf, 内容如下
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="alice"
password="alice-secret";
};
producer 配置修改
修改 producer脚本启动参数, 打开bin/kafka-console-producer.sh
将最后一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
创建一个producer.config 为 console producer指定下面两个属性
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
测试
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
consumer配置修改
修改 consume脚本启动参数, 打开bin/kafka-console-consumer.sh
将最后一行
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
修改为
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
创建一个consumer.config 为 console consumer指定下面三个属性
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group
测试
上面修改了console-producer 以及console-consumer的配置以及启动脚本之后,会发现并不能正常的进行消息生成和消费,会提示认证失败,这个时候就需要对用户进行权限分配了
为topic test_tp1 添加生产者用户alice
[root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer-
properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal
User:alice --producer --topic test_tp1
Adding ACLs for resource `Topic:LITERAL:test_tp1`:
User:alice has Allow permission for operations: Describe from hosts: *
User:alice has Allow permission for operations: Create from hosts: *
User:alice has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Topic:LITERAL:test_tp1`:
User:alice has Allow permission for operations: Describe from hosts: *
User:alice has Allow permission for operations: Create from hosts: *
User:alice has Allow permission for operations: Write from hosts: *
为topic test_tp1添加消费者用户alice, 消费组位 test_group
[root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer-properties
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice --
consumer --topic test_tp1 --group test_group
Adding ACLs for resource `Topic:LITERAL:test_tp1`:
User:alice has Allow permission for operations: Describe from hosts: *
User:alice has Allow permission for operations: Read from hosts: *
Adding ACLs for resource `Group:LITERAL:test_group`:
User:alice has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:test_tp1`:
User:alice has Allow permission for operations: Describe from hosts: *
User:alice has Allow permission for operations: Create from hosts: *
User:alice has Allow permission for operations: Write from hosts: *
User:alice has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Group:LITERAL:test_group`:
User:alice has Allow permission for operations: Read from hosts: *
如果不想限制group,那么可以设置 --group=*
* 生产者测试
```bash
[root@test kafka_cluster_ss]# k1/bin/kafka-console-producer.sh --broker-list
localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --producer.config producer.config
>s
>ss1
>ss2
>
```
* 消费者测试
```csharp
[root@test kafka_cluster_ss]# k1/bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --from-beginning --consumer.config
consumer.config
s1
ss1
ss2
```
添加acl
假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl
允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如
果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
需要注意的是--allow-host和deny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。
删除acl
删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
acl列表
我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
添加或删除作为生产者或消费者的principal
acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。
librdkafkacpp
std::string broker_list = "localhost:9092,localhost:9093,localhost:9094";
RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
global_conf->set("metadata.broker.list", broker_list, err_string);
global_conf->set("security.protocol", "sasl_plaintext", err_string);
global_conf->set("sasl.mechanisms", "PLAIN", err_string);
global_conf->set("sasl.username", username.c_str(), err_string);
global_conf->set("sasl.password", password.c_str(), err_string);
Original url: Access
Created at: 2020-03-11 18:26:28
Category: default
Tags: none
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论