admin管理员组

文章数量:1301533

I’m trying to configure Apache Kafka 3.8.0 in KRaft mode to use Keycloak 26.1.1 for authentication via SASL/OAUTHBEARER. My Keycloak realm is named Kafka-Auth, and I have a confidential client called kafka-broker with “Service Accounts” enabled and a client secret. However, no matter what I do, Kafka fails at startup with:

> .apache.kafkamon.KafkaException: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
>     ...
Caused by: javax.security.auth.login.LoginException: An internal error occurred while retrieving token from callback handler
at .apache.kafkamon.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:319)
>     ...

If I add unsecuredLoginStringClaim_sub="kafka" to my JAAS config, Kafka starts without errors, but that bypasses real token validation. Obviously, that’s not secure. I want Kafka to actually retrieve and validate JWTs from Keycloak.

I’ve tried:

Different Docker images:

  • apache/kafka:3.8.0
  • confluentinc/cp-kafka 7.8.0

Various environment variables for OAUTH:

  • KAFKA_SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL
  • KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL,
  • KAFKA_SASL_OAUTHBEARER_VALID_ISSUER_URI, etc...

KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS=.apache.kafkamon.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS=.apache.kafkamon.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler

kafka_server_jaas.conf:

KafkaServer {   
.apache.kafkamon.security.oauthbearer.OAuthBearerLoginModule required       
oauth.client.id="kafka-broker"     
oauth.client.secret="8Dxpc4eh249VtgVJLfk2KbSnTypfWcIw"     
oauth.token.endpoint.uri="http://keycloak:8083/realms/Kafka-Auth/protocol/openid-connect/token"     
oauth.valid.issuer.uri="http://keycloak:8083/realms/Kafka-Auth"     
oauth.jwks.endpoint.uri="http://keycloak:8083/realms/Kafka-Auth/protocol/openid-connect/certs"     
oauth.username.claim="sub"; 
};

Docker Compose:

services:

  keycloak:
    image: quay.io/keycloak/keycloak:26.1.1
    command: start-dev
    environment:
      - KEYCLOAK_ADMIN=admin
      - KEYCLOAK_ADMIN_PASSWORD=pass
    ports:
      - "8083:8080"

  kafka-controller-1:
    image: apache/kafka:3.8.0
    restart: always
    container_name: kafka-controller-1
    environment:
      KAFKA_PROCESS_ROLES: controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT"
      KAFKA_LOG_DIRS: /tmp/kafka-logs
      KAFKA_CONTROLLER_QUORUM_VOTERS: "0@kafka-controller-0:9093"
      KAFKA_NODE_ID: 1
      KAFKA_LISTENERS: "CONTROLLER://kafka-controller-1:9093"
    volumes:
      - kafka_controller_data_1:/tmp/kafka-logs


  kafka-broker-0:
    image: apache/kafka:3.8.0
    container_name: kafka-broker-0
    restart: always
    ports:
      - "9097:9097"
    environment:
      KAFKA_PROCESS_ROLES: broker
      KAFKA_CONTROLLER_QUORUM_VOTERS: "0@kafka-controller-0:9093"
      KAFKA_NODE_ID: 2
      # Enable OAUTH
      KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
      KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
      KAFKA_SASL_MECHANISM_CONTROLLER_PROTOCOL: OAUTHBEARER

      KAFKA_LISTENERS: "PLAINTEXT://kafka-broker-0:9092,SASL_PLAINTEXT://0.0.0.0:9097"
      KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka-broker-0:9092,SASL_PLAINTEXT://localhost:9097"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT"
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      # OAuth environment variables
      KAFKA_SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL: "http://keycloak:8083/realms/Kafka-Auth/protocol/openid-connect/token"
      KAFKA_SASL_OAUTHBEARER_JWKS_ENDPOINT_URL: "http://keycloak:8083/realms/Kafka-Auth/protocol/openid-connect/certs"
      KAFKA_SASL_OAUTHBEARER_VALID_ISSUER_URI: "http://keycloak:8083/realms/Kafka-Auth"

      # Callback handlers
      KAFKA_SASL_LOGIN_CALLBACK_HANDLER_CLASS: .apache.kafkamon.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
      KAFKA_SASL_SERVER_CALLBACK_HANDLER_CLASS: .apache.kafkamon.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler

      # JAAS file for secrets
      KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

    volumes:
      - ./kafka_server_jaas.conf:/etc/kafka/kafka_server_jaas.conf
    depends_on:
      - keycloak

When Using Insomnia to Test the broker for access token, it works:

{ "access_token": "eyJhbGciOiJSUzI1NiIsInR5...", "expires_in": 300, "refresh_expires_in": 0, "token_type": "Bearer", "not-before-policy": 0, "scope": "profile email" }

本文标签: