PART 1 β€” WHAT CHANGED IN KRAFT

KRaft (Kafka Raft) replaces ZooKeeper entirely. Kafka brokers now handle their own metadata using the Raft consensus protocol. This affects LDAP integration in key ways.Key differences from ZooKeeper mode:

AreaZooKeeper modeKRaft mode
Metadata storeZooKeeper ensembleRaft log inside controllers
MDSSeparate deploymentEmbedded in broker/controller
server.properties key prefixconfluent.metadata.server.*Same prefix, but no ZK config
Node rolesBroker onlybroker, controller, or combined
process.rolesNot applicableMandatory new config
node.idbroker.idMandatory replaces broker.id
Cluster IDAuto-generated by ZKMust be manually generated with kafka-storage

PART 2 β€” KRAFT NODE ROLES

KRaft introduces explicit role assignments per node:---

PART 3 β€” KRAFT CLUSTER INITIALIZATION (Critical First Step)

Before any broker starts, you must format the storage with a cluster ID. This replaces ZooKeeper’s auto-bootstrap.

# Step 1: Generate a unique cluster ID (do this ONCE per cluster)
KAFKA_CLUSTER_ID=$(kafka-storage random-uuid)
echo "Cluster ID: $KAFKA_CLUSTER_ID"
# Example: MkU3OEVBNTcwNTJENDM2Qg
 
# Step 2: Format storage on EVERY node (controllers + brokers)
kafka-storage format \
  -t $KAFKA_CLUSTER_ID \
  -c /etc/kafka/server.properties
 
# Output you want to see:
# Formatting /var/lib/kafka/data with metadata.version 3.6-IV2.

Critical: All nodes in the cluster must be formatted with the same cluster ID. Mismatch = cluster won’t form.


PART 4 β€” KRAFT SERVER.PROPERTIES: COMPLETE CONFIGS

4A β€” Combined Mode (Dev / Small Cluster, 3 nodes)

Each node acts as both broker and controller. This is the simplest setup for teams getting started.

Node 1 (server-1.properties):

##########################
# KRaft Identity
##########################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
 
##########################
# Listeners
##########################
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://kafka-1:9092
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=SASL_PLAINTEXT
controller.listener.names=CONTROLLER
 
##########################
# Storage
##########################
log.dirs=/var/lib/kafka/data
metadata.log.dir=/var/lib/kafka/metadata
 
##########################
# SASL/PLAIN via LDAP β€” Authentication
##########################
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
 
# Broker-to-broker JAAS (uses service account)
listener.name.sasl_plaintext.plain.sasl.jaas.config=\
  org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="kafka-service" \
  password="KafkaServicePass123";
 
# LDAP callback handler for client authentication
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=\
  io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
 
##########################
# LDAP Connection (for authentication)
##########################
ldap.java.naming.provider.url=ldap://ldap-server:389
ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
ldap.java.naming.security.credentials=KafkaServicePass123
ldap.java.naming.security.authentication=simple
ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
 
ldap.user.search.base=ou=People,dc=company,dc=com
ldap.user.search.filter=(uid={0})
ldap.user.name.attribute=uid
ldap.user.object.class=inetOrgPerson
 
##########################
# MDS (Metadata Service β€” embedded) + RBAC
##########################
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.advertised.listeners=http://kafka-1:8090
confluent.metadata.server.authentication.method=BEARER
confluent.metadata.server.user.store=LDAP
 
# Token keypair for JWT (MDS)
confluent.metadata.server.token.key.path=/etc/kafka/secrets/tokenKeypair.pem
 
# LDAP config for MDS (can reuse same LDAP server)
confluent.metadata.server.ldap.java.naming.provider.url=ldap://ldap-server:389
confluent.metadata.server.ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
confluent.metadata.server.ldap.java.naming.security.credentials=KafkaServicePass123
confluent.metadata.server.ldap.java.naming.security.authentication=simple
 
# User lookup
confluent.metadata.server.ldap.user.search.base=ou=People,dc=company,dc=com
confluent.metadata.server.ldap.user.name.attribute=uid
confluent.metadata.server.ldap.user.object.class=inetOrgPerson
confluent.metadata.server.ldap.user.search.filter=(uid={0})
 
# Group lookup (drives group-based RBAC)
confluent.metadata.server.ldap.group.search.base=ou=Groups,dc=company,dc=com
confluent.metadata.server.ldap.group.object.class=groupOfNames
confluent.metadata.server.ldap.group.name.attribute=cn
confluent.metadata.server.ldap.group.member.attribute=member
confluent.metadata.server.ldap.group.member.attribute.pattern=uid=([^,]+).*
confluent.metadata.server.ldap.refresh.interval.ms=60000
 
# Authorizer
confluent.authorizer.access.rule.providers=CONFLUENT
confluent.metadata.server.token.max.lifetime.ms=3600000
 
##########################
# Replication & Defaults
##########################
offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2
num.partitions=3

Node 2 β€” identical except node.id=2 and advertised.listeners=SASL_PLAINTEXT://kafka-2:9092 and confluent.metadata.server.advertised.listeners=http://kafka-2:8090.

Node 3 β€” same pattern with node.id=3.


4B β€” Separated Mode (Production: Dedicated Controllers + Brokers)

Controller nodes (controller.properties) β€” these nodes do NOT serve clients:

##########################
# KRaft Identity
##########################
process.roles=controller
node.id=1
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
 
##########################
# Listeners (controller-only, no client listener)
##########################
listeners=CONTROLLER://:9093
listener.security.protocol.map=CONTROLLER:PLAINTEXT
controller.listener.names=CONTROLLER
 
##########################
# Storage
##########################
log.dirs=/var/lib/kafka/metadata
metadata.log.dir=/var/lib/kafka/metadata
 
##########################
# No SASL/LDAP on pure controllers
# (they don't accept client connections)
##########################

Broker nodes (broker.properties) β€” these serve clients and embed MDS:

##########################
# KRaft Identity
##########################
process.roles=broker
node.id=101
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
 
##########################
# Listeners
##########################
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://broker-1:9092
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT
inter.broker.listener.name=SASL_PLAINTEXT
controller.listener.names=CONTROLLER
 
##########################
# Storage
##########################
log.dirs=/var/lib/kafka/data
 
##########################
# SASL + LDAP Auth
##########################
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
 
listener.name.sasl_plaintext.plain.sasl.jaas.config=\
  org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="kafka-service" \
  password="KafkaServicePass123";
 
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=\
  io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
 
ldap.java.naming.provider.url=ldap://ldap-server:389
ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
ldap.java.naming.security.credentials=KafkaServicePass123
ldap.java.naming.security.authentication=simple
ldap.user.search.base=ou=People,dc=company,dc=com
ldap.user.search.filter=(uid={0})
ldap.user.name.attribute=uid
ldap.user.object.class=inetOrgPerson
 
##########################
# MDS embedded in broker
##########################
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.advertised.listeners=http://broker-1:8090
confluent.metadata.server.authentication.method=BEARER
confluent.metadata.server.user.store=LDAP
confluent.metadata.server.token.key.path=/etc/kafka/secrets/tokenKeypair.pem
 
confluent.metadata.server.ldap.java.naming.provider.url=ldap://ldap-server:389
confluent.metadata.server.ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
confluent.metadata.server.ldap.java.naming.security.credentials=KafkaServicePass123
confluent.metadata.server.ldap.java.naming.security.authentication=simple
 
confluent.metadata.server.ldap.user.search.base=ou=People,dc=company,dc=com
confluent.metadata.server.ldap.user.name.attribute=uid
confluent.metadata.server.ldap.user.object.class=inetOrgPerson
confluent.metadata.server.ldap.user.search.filter=(uid={0})
 
confluent.metadata.server.ldap.group.search.base=ou=Groups,dc=company,dc=com
confluent.metadata.server.ldap.group.object.class=groupOfNames
confluent.metadata.server.ldap.group.name.attribute=cn
confluent.metadata.server.ldap.group.member.attribute=member
confluent.metadata.server.ldap.group.member.attribute.pattern=uid=([^,]+).*
confluent.metadata.server.ldap.refresh.interval.ms=60000
 
confluent.authorizer.access.rule.providers=CONFLUENT
confluent.metadata.server.token.max.lifetime.ms=3600000

PART 5 β€” DOCKER COMPOSE: FULL KRAFT CLUSTER

Combined Mode (3-node, dev-ready)

version: '3.8'
 
x-kafka-common: &kafka-common
  image: confluentinc/cp-server:7.6.0
  restart: unless-stopped
  volumes:
    - ./secrets:/etc/kafka/secrets:ro
  environment: &kafka-env
    # LDAP auth
    KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
    KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
    KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
    KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
    KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS: >
      io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
 
    # LDAP connection
    KAFKA_LDAP_JAVA_NAMING_PROVIDER_URL: ldap://openldap:389
    KAFKA_LDAP_JAVA_NAMING_SECURITY_PRINCIPAL: uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
    KAFKA_LDAP_JAVA_NAMING_SECURITY_CREDENTIALS: KafkaServicePass123
    KAFKA_LDAP_JAVA_NAMING_SECURITY_AUTHENTICATION: simple
    KAFKA_LDAP_USER_SEARCH_BASE: ou=People,dc=company,dc=com
    KAFKA_LDAP_USER_SEARCH_FILTER: "(uid={0})"
    KAFKA_LDAP_USER_NAME_ATTRIBUTE: uid
    KAFKA_LDAP_USER_OBJECT_CLASS: inetOrgPerson
 
    # MDS + RBAC
    KAFKA_CONFLUENT_METADATA_SERVER_AUTHENTICATION_METHOD: BEARER
    KAFKA_CONFLUENT_METADATA_SERVER_USER_STORE: LDAP
    KAFKA_CONFLUENT_METADATA_SERVER_TOKEN_KEY_PATH: /etc/kafka/secrets/tokenKeypair.pem
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_PROVIDER_URL: ldap://openldap:389
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_SECURITY_PRINCIPAL: uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_SECURITY_CREDENTIALS: KafkaServicePass123
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_SECURITY_AUTHENTICATION: simple
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_SEARCH_BASE: ou=People,dc=company,dc=com
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_NAME_ATTRIBUTE: uid
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_OBJECT_CLASS: inetOrgPerson
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_SEARCH_FILTER: "(uid={0})"
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_SEARCH_BASE: ou=Groups,dc=company,dc=com
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_OBJECT_CLASS: groupOfNames
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_NAME_ATTRIBUTE: cn
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_MEMBER_ATTRIBUTE: member
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_MEMBER_ATTRIBUTE_PATTERN: "uid=([^,]+).*"
    KAFKA_CONFLUENT_METADATA_SERVER_LDAP_REFRESH_INTERVAL_MS: "60000"
    KAFKA_CONFLUENT_AUTHORIZER_ACCESS_RULE_PROVIDERS: CONFLUENT
 
    # Replication
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "3"
    KAFKA_DEFAULT_REPLICATION_FACTOR: "3"
    KAFKA_MIN_INSYNC_REPLICAS: "2"
    KAFKA_NUM_PARTITIONS: "3"
 
services:
  openldap:
    image: osixia/openldap:1.5.0
    container_name: openldap
    environment:
      LDAP_DOMAIN: company.com
      LDAP_ADMIN_PASSWORD: AdminSecret123
      LDAP_READONLY_USER: "true"
      LDAP_READONLY_USER_PASSWORD: ReadOnly123
    ports:
      - "389:389"
    volumes:
      - ldap_data:/var/lib/ldap
      - ldap_config:/etc/ldap/slapd.d
 
  kafka-1:
    <<: *kafka-common
    container_name: kafka-1
    hostname: kafka-1
    ports:
      - "9092:9092"
      - "8090:8090"
    environment:
      <<: *kafka-env
      KAFKA_NODE_ID: "1"
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
      KAFKA_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://kafka-1:9092"
      KAFKA_CONFLUENT_METADATA_SERVER_LISTENERS: "http://0.0.0.0:8090"
      KAFKA_CONFLUENT_METADATA_SERVER_ADVERTISED_LISTENERS: "http://kafka-1:8090"
      KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG: >
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka-service"
        password="KafkaServicePass123";
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"  # Run kafka-storage random-uuid to generate
    volumes:
      - kafka1_data:/var/lib/kafka/data
      - ./secrets:/etc/kafka/secrets:ro
 
  kafka-2:
    <<: *kafka-common
    container_name: kafka-2
    hostname: kafka-2
    ports:
      - "9093:9092"
      - "8091:8090"
    environment:
      <<: *kafka-env
      KAFKA_NODE_ID: "2"
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
      KAFKA_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://kafka-2:9092"
      KAFKA_CONFLUENT_METADATA_SERVER_LISTENERS: "http://0.0.0.0:8090"
      KAFKA_CONFLUENT_METADATA_SERVER_ADVERTISED_LISTENERS: "http://kafka-2:8090"
      KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG: >
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka-service"
        password="KafkaServicePass123";
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"
    volumes:
      - kafka2_data:/var/lib/kafka/data
      - ./secrets:/etc/kafka/secrets:ro
 
  kafka-3:
    <<: *kafka-common
    container_name: kafka-3
    hostname: kafka-3
    ports:
      - "9094:9092"
      - "8092:8090"
    environment:
      <<: *kafka-env
      KAFKA_NODE_ID: "3"
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
      KAFKA_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
      KAFKA_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://kafka-3:9092"
      KAFKA_CONFLUENT_METADATA_SERVER_LISTENERS: "http://0.0.0.0:8090"
      KAFKA_CONFLUENT_METADATA_SERVER_ADVERTISED_LISTENERS: "http://kafka-3:8090"
      KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG: >
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="kafka-service"
        password="KafkaServicePass123";
      CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"
    volumes:
      - kafka3_data:/var/lib/kafka/data
      - ./secrets:/etc/kafka/secrets:ro
 
volumes:
  ldap_data:
  ldap_config:
  kafka1_data:
  kafka2_data:
  kafka3_data:

PART 6 β€” TOKEN KEYPAIR GENERATION (MDS Requirement)

MDS issues JWT tokens. Both the private key (for signing) and public key (for verification) must be on all nodes.

# Generate RSA keypair
mkdir -p ./secrets
 
# Private key
openssl genrsa -out ./secrets/tokenKeypair.pem 2048
 
# Public key (for clients verifying MDS tokens)
openssl rsa -in ./secrets/tokenKeypair.pem \
  -outform PEM \
  -pubout \
  -out ./secrets/public.pem
 
chmod 600 ./secrets/tokenKeypair.pem
chmod 644 ./secrets/public.pem

Place tokenKeypair.pem on every broker. The MDS token path in config should point to this file.


PART 7 β€” STARTUP ORDER AND HEALTH CHECKS

KRaft has a specific startup order requirement:

# 1. Start LDAP first
docker-compose up -d openldap
 
# 2. Verify LDAP is healthy
ldapsearch -x -H ldap://localhost:389 \
  -D "cn=admin,dc=company,dc=com" -w AdminSecret123 \
  -b "dc=company,dc=com" "(objectClass=*)" dn
 
# 3. Bootstrap LDAP structure (run once)
ldapadd -x -H ldap://localhost:389 \
  -D "cn=admin,dc=company,dc=com" -w AdminSecret123 \
  -f bootstrap.ldif
 
# 4. Start all Kafka nodes simultaneously
# (unlike ZK mode, all KRaft nodes discover each other via quorum.voters)
docker-compose up -d kafka-1 kafka-2 kafka-3
 
# 5. Wait for quorum to form (check logs)
docker-compose logs -f kafka-1 | grep -E "Metadata|quorum|controller"
 
# 6. Verify cluster metadata
kafka-metadata-quorum \
  --bootstrap-server kafka-1:9092 \
  --command-config client.properties \
  describe --status
 
# Good output:
# ClusterId: MkU3OEVBNTcwNTJENDM2Qg
# LeaderId: 1
# LeaderEpoch: 1
# HighWatermark: 5
# MaxFollowerLag: 0
# MaxFollowerLagTimeMs: 5
# CurrentVoters: [1,2,3]
# CurrentObservers: []

PART 8 β€” DYNAMIC USER CREATION + RBAC AUTOMATION

Script: Full Onboarding (LDAP + RBAC in one shot)

#!/bin/bash
# onboard-user.sh β€” creates LDAP user and assigns Kafka RBAC in one workflow
 
set -euo pipefail
 
LDAP_HOST="ldap://localhost:389"
LDAP_ADMIN_DN="cn=admin,dc=company,dc=com"
LDAP_ADMIN_PASS="AdminSecret123"
BASE_DN="dc=company,dc=com"
MDS_URL="http://kafka-1:8090"
MDS_ADMIN="kafka-admin"
MDS_ADMIN_PASS="KafkaAdmin123"
CLUSTER_ID="MkU3OEVBNTcwNTJENDM2Qg"
 
# Args: username firstname lastname email role(developer|devops) env(dev|staging|prod)
USERNAME=$1
FIRSTNAME=$2
LASTNAME=$3
EMAIL=$4
ROLE=$5        # developer | devops
ENV=$6         # dev | staging | prod
 
# Map role to LDAP group
case $ROLE in
  developer) LDAP_GROUP="developers" ;;
  devops)    LDAP_GROUP="devops" ;;
  *)         echo "Unknown role: $ROLE"; exit 1 ;;
esac
 
echo "=== Step 1: Creating LDAP user ==="
HASHED=$(slappasswd -s "Welcome@${USERNAME}123")
UID_NUMBER=$(shuf -i 10000-99999 -n 1)
 
cat > /tmp/user_${USERNAME}.ldif <<EOF
dn: uid=${USERNAME},ou=People,${BASE_DN}
objectClass: inetOrgPerson
objectClass: posixAccount
objectClass: shadowAccount
cn: ${FIRSTNAME} ${LASTNAME}
sn: ${LASTNAME}
givenName: ${FIRSTNAME}
uid: ${USERNAME}
mail: ${EMAIL}
userPassword: ${HASHED}
uidNumber: ${UID_NUMBER}
gidNumber: 5000
homeDirectory: /home/${USERNAME}
loginShell: /bin/bash
description: ${ROLE} - ${ENV}
EOF
 
ldapadd -x -H "$LDAP_HOST" -D "$LDAP_ADMIN_DN" -w "$LDAP_ADMIN_PASS" \
  -f /tmp/user_${USERNAME}.ldif
echo "βœ… LDAP user created"
 
echo "=== Step 2: Adding to group ==="
cat > /tmp/grp_${USERNAME}.ldif <<EOF
dn: cn=${LDAP_GROUP},ou=Groups,${BASE_DN}
changetype: modify
add: member
member: uid=${USERNAME},ou=People,${BASE_DN}
EOF
 
ldapmodify -x -H "$LDAP_HOST" -D "$LDAP_ADMIN_DN" -w "$LDAP_ADMIN_PASS" \
  -f /tmp/grp_${USERNAME}.ldif
echo "βœ… Added to group: ${LDAP_GROUP}"
 
echo "=== Step 3: MDS login ==="
confluent login --url "$MDS_URL" \
  --username "$MDS_ADMIN" --password "$MDS_ADMIN_PASS"
 
echo "=== Step 4: Assign Kafka RBAC ==="
# Topic prefix for this env: dev-*, staging-*, prod-*
TOPIC_PREFIX="${ENV}-"
 
if [ "$ROLE" = "developer" ]; then
  # Developers: read + write to their env topics
  confluent iam rbac role-binding create \
    --principal "User:${USERNAME}" \
    --role DeveloperWrite \
    --resource "Topic:${TOPIC_PREFIX}" \
    --prefix \
    --kafka-cluster-id "$CLUSTER_ID"
 
  confluent iam rbac role-binding create \
    --principal "User:${USERNAME}" \
    --role DeveloperRead \
    --resource "Topic:${TOPIC_PREFIX}" \
    --prefix \
    --kafka-cluster-id "$CLUSTER_ID"
 
  confluent iam rbac role-binding create \
    --principal "User:${USERNAME}" \
    --role DeveloperRead \
    --resource "Group:${USERNAME}-" \
    --prefix \
    --kafka-cluster-id "$CLUSTER_ID"
 
elif [ "$ROLE" = "devops" ]; then
  # DevOps: full resource ownership on their env
  confluent iam rbac role-binding create \
    --principal "User:${USERNAME}" \
    --role ResourceOwner \
    --resource "Topic:${TOPIC_PREFIX}" \
    --prefix \
    --kafka-cluster-id "$CLUSTER_ID"
 
  confluent iam rbac role-binding create \
    --principal "User:${USERNAME}" \
    --role ResourceOwner \
    --resource "Group:${TOPIC_PREFIX}" \
    --prefix \
    --kafka-cluster-id "$CLUSTER_ID"
 
  # DevOps can also view cluster metadata
  confluent iam rbac role-binding create \
    --principal "User:${USERNAME}" \
    --role Operator \
    --kafka-cluster-id "$CLUSTER_ID"
fi
 
echo ""
echo "βœ… User ${USERNAME} fully onboarded"
echo "   LDAP group: ${LDAP_GROUP}"
echo "   Kafka role: ${ROLE} on env: ${ENV}"
echo "   Temp password: Welcome@${USERNAME}123 (force change on first login)"
 
# Cleanup
rm -f /tmp/user_${USERNAME}.ldif /tmp/grp_${USERNAME}.ldif
# Usage examples
./onboard-user.sh alice Alice Smith alice@company.com developer dev
./onboard-user.sh bob Bob Jones bob@company.com devops staging
./onboard-user.sh carol Carol White carol@company.com developer prod

PART 9 β€” GROUP-BASED RBAC (Preferred Approach)

Instead of binding every user individually, bind the LDAP group once. All members inherit access automatically.

# Get cluster ID
CLUSTER_ID=$(kafka-cluster cluster-id \
  --bootstrap-server kafka-1:9092 \
  --command-config client.properties)
 
# ── Developers group ──────────────────────────────────
 
# Read + Write all dev- topics
confluent iam rbac role-binding create \
  --principal Group:developers \
  --role DeveloperWrite \
  --resource "Topic:dev-" \
  --prefix \
  --kafka-cluster-id $CLUSTER_ID
 
confluent iam rbac role-binding create \
  --principal Group:developers \
  --role DeveloperRead \
  --resource "Topic:dev-" \
  --prefix \
  --kafka-cluster-id $CLUSTER_ID
 
# Read schemas on dev subjects
confluent iam rbac role-binding create \
  --principal Group:developers \
  --role DeveloperRead \
  --resource "Subject:dev-" \
  --prefix \
  --schema-registry-cluster-id $SR_CLUSTER_ID
 
# ── DevOps group ──────────────────────────────────────
 
# Full ownership on all topics
confluent iam rbac role-binding create \
  --principal Group:devops \
  --role ResourceOwner \
  --resource "Topic:*" \
  --prefix \
  --kafka-cluster-id $CLUSTER_ID
 
# Cluster operations (connectors, health, configs)
confluent iam rbac role-binding create \
  --principal Group:devops \
  --role Operator \
  --kafka-cluster-id $CLUSTER_ID
 
# ── kafka-admins group ────────────────────────────────
 
confluent iam rbac role-binding create \
  --principal Group:kafka-admins \
  --role SystemAdmin \
  --kafka-cluster-id $CLUSTER_ID

PART 10 β€” KAFKA CLI WITH LDAP CREDENTIALS

client.properties (for CLI tools)

# For developers
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="alice" \
  password="TempPass@123";

Day-to-day CLI operations

# Produce to a topic
kafka-console-producer \
  --bootstrap-server kafka-1:9092 \
  --topic dev-orders \
  --producer.config /etc/kafka/client.properties
 
# Consume from a topic
kafka-console-consumer \
  --bootstrap-server kafka-1:9092 \
  --topic dev-orders \
  --group alice-dev-group \
  --from-beginning \
  --consumer.config /etc/kafka/client.properties
 
# List topics (as developer)
kafka-topics \
  --bootstrap-server kafka-1:9092 \
  --command-config /etc/kafka/client.properties \
  --list
 
# Create a topic (devops only β€” needs ResourceOwner or DeveloperManage)
kafka-topics \
  --bootstrap-server kafka-1:9092 \
  --command-config /etc/kafka/devops-client.properties \
  --create \
  --topic dev-payments \
  --partitions 6 \
  --replication-factor 3
 
# Describe consumer group lag
kafka-consumer-groups \
  --bootstrap-server kafka-1:9092 \
  --command-config /etc/kafka/client.properties \
  --describe \
  --group alice-dev-group

PART 11 β€” KRAFT-SPECIFIC MONITORING AND OPS

Check KRaft Quorum Status

# Quorum status (no auth needed for metadata commands if using admin config)
kafka-metadata-quorum \
  --bootstrap-server kafka-1:9092 \
  --command-config admin-client.properties \
  describe --status
 
# Quorum replication status
kafka-metadata-quorum \
  --bootstrap-server kafka-1:9092 \
  --command-config admin-client.properties \
  describe --replication
 
# View metadata log (KRaft replaces ZK znodes with this)
kafka-metadata-shell \
  --snapshot /var/lib/kafka/metadata/__cluster_metadata-0/00000000000000000000.log

Check MDS Health

# MDS health endpoint
curl http://kafka-1:8090/kafka/v3/clusters
 
# MDS token endpoint (verify LDAP auth is working)
curl -u alice:TempPass@123 \
  http://kafka-1:8090/security/1.0/authenticate
 
# Response includes bearer token β€” if LDAP auth fails, this returns 401

LDAP Sync Verification

# Force MDS to refresh its LDAP group cache
curl -X POST \
  -H "Authorization: Bearer $(get-mds-token)" \
  http://kafka-1:8090/security/1.0/ldap/groups/refresh
 
# List groups MDS has loaded from LDAP
curl \
  -H "Authorization: Bearer $(get-mds-token)" \
  http://kafka-1:8090/security/1.0/ldap/groups

PART 12 β€” TROUBLESHOOTING KRAFT + LDAP

Common Issues

ProblemKafka ErrorRoot CauseFix
Cluster won’t startNo leader electedMismatched CLUSTER_ID across nodesRe-format all nodes with same ID
Quorum won’t formUnable to find leadercontroller.quorum.voters hostnames wrongVerify DNS resolution between nodes
LDAP auth failsSASL authentication failedCallback handler class not on classpathVerify CP Server (not community) image
RBAC not applyingAuthorization failedMDS didn’t load groups yetWait for ldap.refresh.interval.ms or trigger manual refresh
MDS 401 on tokenToken verification failedtokenKeypair.pem mismatch between nodesDistribute same keypair to all brokers
User added to group, no accessGroups stale in MDSCache not expired yetReduce refresh interval or restart
node.id not setBroker fails to startMissing mandatory KRaft configAdd KAFKA_NODE_ID to all nodes

Log Locations to Watch

# KRaft controller election events
docker logs kafka-1 2>&1 | grep -E "Raft|leader|epoch|quorum"
 
# LDAP bind attempts (auth events)
docker logs kafka-1 2>&1 | grep -E "LDAP|ldap|authenticate|SASL"
 
# MDS startup and group load
docker logs kafka-1 2>&1 | grep -E "MDS|metadata|group.*loaded"
 
# Authorization decisions
docker logs kafka-1 2>&1 | grep -E "Authoriz|RBAC|deny|allow"

Reset a Node (KRaft way β€” no ZK to wipe)

# Stop node
docker-compose stop kafka-2
 
# Wipe data and metadata logs
docker exec kafka-2 rm -rf /var/lib/kafka/data/* /var/lib/kafka/metadata/*
 
# Re-format with SAME cluster ID
docker exec kafka-2 kafka-storage format \
  -t MkU3OEVBNTcwNTJENDM2Qg \
  -c /etc/kafka/server.properties
 
# Restart
docker-compose start kafka-2

PART 13 β€” KRaft + LDAP QUICK REFERENCE

Startup checklist:
  1. Generate cluster ID:     kafka-storage random-uuid
  2. Format all nodes:        kafka-storage format -t <id> -c server.properties
  3. Generate MDS keypair:    openssl genrsa -out tokenKeypair.pem 2048
  4. Bootstrap LDAP:          ldapadd -f bootstrap.ldif
  5. Start nodes (all at once, no ZK ordering)
  6. Verify quorum:           kafka-metadata-quorum describe --status
  7. Login to MDS:            confluent login --url http://kafka-1:8090
  8. Create RBAC bindings:    confluent iam rbac role-binding create ...

KRaft-only mandatory configs vs ZK mode:
  process.roles        = broker | controller | broker,controller
  node.id              = unique integer per node (replaces broker.id)
  controller.quorum.voters = id@host:port,id@host:port,...
  metadata.log.dir     = path for Raft log (separate from data)
  CLUSTER_ID           = must be identical across all nodes

What stayed the same:
  - All ldap.* config keys (auth)
  - All confluent.metadata.server.ldap.* keys (MDS/RBAC)
  - All RBAC role binding commands
  - Client SASL/PLAIN config
  - LDAP user/group structure in OpenLDAP

The single biggest operational difference between ZooKeeper and KRaft is the cluster ID lifecycle β€” in ZK mode it was automatic; in KRaft you generate it once, format every node with it, and it never changes for the life of the cluster.