kafka权限控制 - 简书

    • *

为了kafka集群的安全,我们一般会开启认证,kafka的认证方式有多种,详细见http://kafka.apache.org/documentation/#security, 这里只讲下sasl认证方式

1. kafka的认证范围

  • kafka client 与 kafka server(broker)
  • broker与broker之间
  • broker与zookeeper之间

2. 开启认证的步骤

2.1 zookeeper的认证配置

  • 在zookeeper安装根目录的conf目录下,创建zk_server_jaas.conf, 文件内容如下

    Server {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="admin"
       password="admin-secret"
       user_kafka="kafka-secret";
    };
    

user_kafka="kafka-secret"; 创建用户kafka, 密码是"kafka-secret", 该账号用于kafka broker与zookeeper连接的时候的认证

  • 修改zoo.cfg, 添加一下内容

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider  
    requireClientAuthScheme=sasl  
    jaasLoginRenew=3600000
    
  • 因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是kafka-client.jar里面,所有需要将相应的jar拷贝到 zookeeper安装根目录的lib目录下, 大概要copy这些jar

    kafka-clients-2.0.0.jar
    lz4-java-1.4.1.jar
    osgi-resource-locator-1.0.1.jar
    slf4j-api-1.7.25.jar
    snappy-java-1.1.7.1.jar
    
  • 然后就是修改zk的启动参数, 修改 bin/zkEnv.sh, 在文件尾加上

    SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
    
  • 然后重新启动zookeeper服务就好(如果是集群,每个集群都进行相应的操作)

2.2 kafka broker的认证配置

  • 在安装根目录的config目录下, 创建kafka_server_jaas.conf, 内容如下

    KafkaServer {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="admin"
        password="admin-secret"
        user_admin="admin-secret"
        user_alice="alice-secret";
    };
    
    Client {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka"
        password="kafka-secret";
    };
    

KafkaServer段里面配置了broker之间的认证配置以及client和broker之间的认证配置
KafkaServer.username, KafkaServer.password用于broker之间的相互认证
KafkaServer.user_admin和KafkaServer.user_alice用于client和broker之间的认证, 下面我们client里面都用用户alice进行认证
Client段里面定义username和password用于broker与zookeeper连接的认证

  • 修改 config/server.properties

    # 本例使用SASL_PLAINTEXT
    listeners=SASL_PLAINTEXT://127.0.0.1:9092  
    security.inter.broker.protocol=SASL_PLAINTEXT    
    sasl.enabled.mechanisms=PLAIN    
    sasl.mechanism.inter.broker.protocol=PLAIN    
    # 配置ACL入口类
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer  
    # default false | true to accept all the users to use it.
    # allow.everyone.if.no.acl.found=true 
    # 设置本例中admin为超级用户
    super.users=User:admin
    
  • 修改kafka启动脚本, 添加 java.security.auth.login.config 环境变量。打开 bin/kafka-server-start.sh
    将最后一句

    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
    

    修改为

    exec $base_dir/kafka-run-class.sh $EXTRA_ARGS  -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_server_jaas.conf kafka.Kafka "$@"
    
  • 然后重启kafka server就可(如果是集群,所有的broker都进行相应的修改操作)

2.3 kafka client的认证配置

  • 在kakfka根目录/config/下创建kafka_client_jaas.conf, 内容如下

    KafkaClient {
          org.apache.kafka.common.security.plain.PlainLoginModule required
          username="alice"
          password="alice-secret";
    };
    
  • producer 配置修改

    • 修改 producer脚本启动参数, 打开bin/kafka-console-producer.sh
      将最后一行

      exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
      

      修改为

      exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
      
    • 创建一个producer.config 为 console producer指定下面两个属性

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      
    • 测试

      $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config 
      
  • consumer配置修改

    • 修改 consume脚本启动参数, 打开bin/kafka-console-consumer.sh
      将最后一行

      exec $(dirname $0)/kafka-run-class.sh  kafka.tools.ConsoleConsumer "$@"
      

      修改为

      exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
      
    • 创建一个consumer.config 为 console consumer指定下面三个属性

      security.protocol=SASL_PLAINTEXT
      sasl.mechanism=PLAIN
      group.id=test-group
      
  • 测试
    上面修改了console-producer 以及console-consumer的配置以及启动脚本之后,会发现并不能正常的进行消息生成和消费,会提示认证失败,这个时候就需要对用户进行权限分配了

    • 为topic test_tp1 添加生产者用户alice

      [root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer- 
      properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal 
      User:alice --producer --topic test_tp1
      Adding ACLs for resource `Topic:LITERAL:test_tp1`: 
              User:alice has Allow permission for operations: Describe from hosts: *
              User:alice has Allow permission for operations: Create from hosts: *
              User:alice has Allow permission for operations: Write from hosts: * 
      
      Current ACLs for resource `Topic:LITERAL:test_tp1`: 
             User:alice has Allow permission for operations: Describe from hosts: *
             User:alice has Allow permission for operations: Create from hosts: *
             User:alice has Allow permission for operations: Write from hosts: * 
      
      
    • 为topic test_tp1添加消费者用户alice, 消费组位 test_group

      [root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer-properties 
      zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice -- 
      consumer --topic test_tp1 --group test_group
      Adding ACLs for resource `Topic:LITERAL:test_tp1`: 
              User:alice has Allow permission for operations: Describe from hosts: *
              User:alice has Allow permission for operations: Read from hosts: * 
      
      Adding ACLs for resource `Group:LITERAL:test_group`: 
             User:alice has Allow permission for operations: Read from hosts: * 
      
      Current ACLs for resource `Topic:LITERAL:test_tp1`: 
             User:alice has Allow permission for operations: Describe from hosts: *
             User:alice has Allow permission for operations: Create from hosts: *
             User:alice has Allow permission for operations: Write from hosts: *
             User:alice has Allow permission for operations: Read from hosts: * 
      
      Current ACLs for resource `Group:LITERAL:test_group`: 
              User:alice has Allow permission for operations: Read from hosts: * 
      
如果不想限制group,那么可以设置 --group=*
*   生产者测试
    
    ```bash
    [root@test kafka_cluster_ss]# k1/bin/kafka-console-producer.sh --broker-list 
    localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --producer.config producer.config 
    >s
    >ss1
    >ss2
    >
    
    ```
    
*   消费者测试
    
    ```csharp
    [root@test kafka_cluster_ss]# k1/bin/kafka-console-consumer.sh --bootstrap-server 
    localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --from-beginning --consumer.config 
    consumer.config  
    s1
    ss1
    ss2
    
    ```
    

2.4 acl 例子

这段摘自 http://orchome.com/185

  • 添加acl
    假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    

    默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl
    允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如
    果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:

     bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
    

    需要注意的是--allow-host和deny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。

  • 删除acl
    删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
    
  • acl列表
    我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
    
  • 添加或删除作为生产者或消费者的principal
    acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
    

    同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:

    bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
    

    注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。

2.6 kafka的一些client的设置

  • librdkafkacpp

    std::string broker_list = "localhost:9092,localhost:9093,localhost:9094";
    RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    global_conf->set("metadata.broker.list", broker_list, err_string);
    global_conf->set("security.protocol", "sasl_plaintext", err_string);
    global_conf->set("sasl.mechanisms", "PLAIN", err_string);
    global_conf->set("sasl.username", username.c_str(), err_string);
    global_conf->set("sasl.password", password.c_str(), err_string);
    

2.7 参考


Original url: Access
Created at: 2020-03-11 18:26:28
Category: default
Tags: none

请先后发表评论
  • 最新评论
  • 总共0条评论