PART 1 β WHAT CHANGED IN KRAFT
KRaft (Kafka Raft) replaces ZooKeeper entirely. Kafka brokers now handle their own metadata using the Raft consensus protocol. This affects LDAP integration in key ways.Key differences from ZooKeeper mode:
| Area | ZooKeeper mode | KRaft mode |
|---|---|---|
| Metadata store | ZooKeeper ensemble | Raft log inside controllers |
| MDS | Separate deployment | Embedded in broker/controller |
server.properties key prefix | confluent.metadata.server.* | Same prefix, but no ZK config |
| Node roles | Broker only | broker, controller, or combined |
process.roles | Not applicable | Mandatory new config |
node.id | broker.id | Mandatory replaces broker.id |
| Cluster ID | Auto-generated by ZK | Must be manually generated with kafka-storage |
PART 2 β KRAFT NODE ROLES
KRaft introduces explicit role assignments per node:---
PART 3 β KRAFT CLUSTER INITIALIZATION (Critical First Step)
Before any broker starts, you must format the storage with a cluster ID. This replaces ZooKeeperβs auto-bootstrap.
# Step 1: Generate a unique cluster ID (do this ONCE per cluster)
KAFKA_CLUSTER_ID=$(kafka-storage random-uuid)
echo "Cluster ID: $KAFKA_CLUSTER_ID"
# Example: MkU3OEVBNTcwNTJENDM2Qg
# Step 2: Format storage on EVERY node (controllers + brokers)
kafka-storage format \
-t $KAFKA_CLUSTER_ID \
-c /etc/kafka/server.properties
# Output you want to see:
# Formatting /var/lib/kafka/data with metadata.version 3.6-IV2.Critical: All nodes in the cluster must be formatted with the same cluster ID. Mismatch = cluster wonβt form.
PART 4 β KRAFT SERVER.PROPERTIES: COMPLETE CONFIGS
4A β Combined Mode (Dev / Small Cluster, 3 nodes)
Each node acts as both broker and controller. This is the simplest setup for teams getting started.
Node 1 (server-1.properties):
##########################
# KRaft Identity
##########################
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
##########################
# Listeners
##########################
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=SASL_PLAINTEXT://kafka-1:9092
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT
inter.broker.listener.name=SASL_PLAINTEXT
controller.listener.names=CONTROLLER
##########################
# Storage
##########################
log.dirs=/var/lib/kafka/data
metadata.log.dir=/var/lib/kafka/metadata
##########################
# SASL/PLAIN via LDAP β Authentication
##########################
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# Broker-to-broker JAAS (uses service account)
listener.name.sasl_plaintext.plain.sasl.jaas.config=\
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="kafka-service" \
password="KafkaServicePass123";
# LDAP callback handler for client authentication
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=\
io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
##########################
# LDAP Connection (for authentication)
##########################
ldap.java.naming.provider.url=ldap://ldap-server:389
ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
ldap.java.naming.security.credentials=KafkaServicePass123
ldap.java.naming.security.authentication=simple
ldap.java.naming.factory.initial=com.sun.jndi.ldap.LdapCtxFactory
ldap.user.search.base=ou=People,dc=company,dc=com
ldap.user.search.filter=(uid={0})
ldap.user.name.attribute=uid
ldap.user.object.class=inetOrgPerson
##########################
# MDS (Metadata Service β embedded) + RBAC
##########################
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.advertised.listeners=http://kafka-1:8090
confluent.metadata.server.authentication.method=BEARER
confluent.metadata.server.user.store=LDAP
# Token keypair for JWT (MDS)
confluent.metadata.server.token.key.path=/etc/kafka/secrets/tokenKeypair.pem
# LDAP config for MDS (can reuse same LDAP server)
confluent.metadata.server.ldap.java.naming.provider.url=ldap://ldap-server:389
confluent.metadata.server.ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
confluent.metadata.server.ldap.java.naming.security.credentials=KafkaServicePass123
confluent.metadata.server.ldap.java.naming.security.authentication=simple
# User lookup
confluent.metadata.server.ldap.user.search.base=ou=People,dc=company,dc=com
confluent.metadata.server.ldap.user.name.attribute=uid
confluent.metadata.server.ldap.user.object.class=inetOrgPerson
confluent.metadata.server.ldap.user.search.filter=(uid={0})
# Group lookup (drives group-based RBAC)
confluent.metadata.server.ldap.group.search.base=ou=Groups,dc=company,dc=com
confluent.metadata.server.ldap.group.object.class=groupOfNames
confluent.metadata.server.ldap.group.name.attribute=cn
confluent.metadata.server.ldap.group.member.attribute=member
confluent.metadata.server.ldap.group.member.attribute.pattern=uid=([^,]+).*
confluent.metadata.server.ldap.refresh.interval.ms=60000
# Authorizer
confluent.authorizer.access.rule.providers=CONFLUENT
confluent.metadata.server.token.max.lifetime.ms=3600000
##########################
# Replication & Defaults
##########################
offsets.topic.replication.factor=3
default.replication.factor=3
min.insync.replicas=2
num.partitions=3Node 2 β identical except node.id=2 and advertised.listeners=SASL_PLAINTEXT://kafka-2:9092 and confluent.metadata.server.advertised.listeners=http://kafka-2:8090.
Node 3 β same pattern with node.id=3.
4B β Separated Mode (Production: Dedicated Controllers + Brokers)
Controller nodes (controller.properties) β these nodes do NOT serve clients:
##########################
# KRaft Identity
##########################
process.roles=controller
node.id=1
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
##########################
# Listeners (controller-only, no client listener)
##########################
listeners=CONTROLLER://:9093
listener.security.protocol.map=CONTROLLER:PLAINTEXT
controller.listener.names=CONTROLLER
##########################
# Storage
##########################
log.dirs=/var/lib/kafka/metadata
metadata.log.dir=/var/lib/kafka/metadata
##########################
# No SASL/LDAP on pure controllers
# (they don't accept client connections)
##########################Broker nodes (broker.properties) β these serve clients and embed MDS:
##########################
# KRaft Identity
##########################
process.roles=broker
node.id=101
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
##########################
# Listeners
##########################
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://broker-1:9092
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT
inter.broker.listener.name=SASL_PLAINTEXT
controller.listener.names=CONTROLLER
##########################
# Storage
##########################
log.dirs=/var/lib/kafka/data
##########################
# SASL + LDAP Auth
##########################
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
listener.name.sasl_plaintext.plain.sasl.jaas.config=\
org.apache.kafka.common.security.plain.PlainLoginModule required \
username="kafka-service" \
password="KafkaServicePass123";
listener.name.sasl_plaintext.plain.sasl.server.callback.handler.class=\
io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
ldap.java.naming.provider.url=ldap://ldap-server:389
ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
ldap.java.naming.security.credentials=KafkaServicePass123
ldap.java.naming.security.authentication=simple
ldap.user.search.base=ou=People,dc=company,dc=com
ldap.user.search.filter=(uid={0})
ldap.user.name.attribute=uid
ldap.user.object.class=inetOrgPerson
##########################
# MDS embedded in broker
##########################
confluent.metadata.server.listeners=http://0.0.0.0:8090
confluent.metadata.server.advertised.listeners=http://broker-1:8090
confluent.metadata.server.authentication.method=BEARER
confluent.metadata.server.user.store=LDAP
confluent.metadata.server.token.key.path=/etc/kafka/secrets/tokenKeypair.pem
confluent.metadata.server.ldap.java.naming.provider.url=ldap://ldap-server:389
confluent.metadata.server.ldap.java.naming.security.principal=uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
confluent.metadata.server.ldap.java.naming.security.credentials=KafkaServicePass123
confluent.metadata.server.ldap.java.naming.security.authentication=simple
confluent.metadata.server.ldap.user.search.base=ou=People,dc=company,dc=com
confluent.metadata.server.ldap.user.name.attribute=uid
confluent.metadata.server.ldap.user.object.class=inetOrgPerson
confluent.metadata.server.ldap.user.search.filter=(uid={0})
confluent.metadata.server.ldap.group.search.base=ou=Groups,dc=company,dc=com
confluent.metadata.server.ldap.group.object.class=groupOfNames
confluent.metadata.server.ldap.group.name.attribute=cn
confluent.metadata.server.ldap.group.member.attribute=member
confluent.metadata.server.ldap.group.member.attribute.pattern=uid=([^,]+).*
confluent.metadata.server.ldap.refresh.interval.ms=60000
confluent.authorizer.access.rule.providers=CONFLUENT
confluent.metadata.server.token.max.lifetime.ms=3600000PART 5 β DOCKER COMPOSE: FULL KRAFT CLUSTER
Combined Mode (3-node, dev-ready)
version: '3.8'
x-kafka-common: &kafka-common
image: confluentinc/cp-server:7.6.0
restart: unless-stopped
volumes:
- ./secrets:/etc/kafka/secrets:ro
environment: &kafka-env
# LDAP auth
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT"
KAFKA_INTER_BROKER_LISTENER_NAME: SASL_PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_SERVER_CALLBACK_HANDLER_CLASS: >
io.confluent.security.auth.provider.ldap.LdapAuthenticateCallbackHandler
# LDAP connection
KAFKA_LDAP_JAVA_NAMING_PROVIDER_URL: ldap://openldap:389
KAFKA_LDAP_JAVA_NAMING_SECURITY_PRINCIPAL: uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
KAFKA_LDAP_JAVA_NAMING_SECURITY_CREDENTIALS: KafkaServicePass123
KAFKA_LDAP_JAVA_NAMING_SECURITY_AUTHENTICATION: simple
KAFKA_LDAP_USER_SEARCH_BASE: ou=People,dc=company,dc=com
KAFKA_LDAP_USER_SEARCH_FILTER: "(uid={0})"
KAFKA_LDAP_USER_NAME_ATTRIBUTE: uid
KAFKA_LDAP_USER_OBJECT_CLASS: inetOrgPerson
# MDS + RBAC
KAFKA_CONFLUENT_METADATA_SERVER_AUTHENTICATION_METHOD: BEARER
KAFKA_CONFLUENT_METADATA_SERVER_USER_STORE: LDAP
KAFKA_CONFLUENT_METADATA_SERVER_TOKEN_KEY_PATH: /etc/kafka/secrets/tokenKeypair.pem
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_PROVIDER_URL: ldap://openldap:389
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_SECURITY_PRINCIPAL: uid=kafka-service,ou=ServiceAccounts,dc=company,dc=com
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_SECURITY_CREDENTIALS: KafkaServicePass123
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_JAVA_NAMING_SECURITY_AUTHENTICATION: simple
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_SEARCH_BASE: ou=People,dc=company,dc=com
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_NAME_ATTRIBUTE: uid
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_OBJECT_CLASS: inetOrgPerson
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_USER_SEARCH_FILTER: "(uid={0})"
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_SEARCH_BASE: ou=Groups,dc=company,dc=com
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_OBJECT_CLASS: groupOfNames
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_NAME_ATTRIBUTE: cn
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_MEMBER_ATTRIBUTE: member
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_GROUP_MEMBER_ATTRIBUTE_PATTERN: "uid=([^,]+).*"
KAFKA_CONFLUENT_METADATA_SERVER_LDAP_REFRESH_INTERVAL_MS: "60000"
KAFKA_CONFLUENT_AUTHORIZER_ACCESS_RULE_PROVIDERS: CONFLUENT
# Replication
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "3"
KAFKA_DEFAULT_REPLICATION_FACTOR: "3"
KAFKA_MIN_INSYNC_REPLICAS: "2"
KAFKA_NUM_PARTITIONS: "3"
services:
openldap:
image: osixia/openldap:1.5.0
container_name: openldap
environment:
LDAP_DOMAIN: company.com
LDAP_ADMIN_PASSWORD: AdminSecret123
LDAP_READONLY_USER: "true"
LDAP_READONLY_USER_PASSWORD: ReadOnly123
ports:
- "389:389"
volumes:
- ldap_data:/var/lib/ldap
- ldap_config:/etc/ldap/slapd.d
kafka-1:
<<: *kafka-common
container_name: kafka-1
hostname: kafka-1
ports:
- "9092:9092"
- "8090:8090"
environment:
<<: *kafka-env
KAFKA_NODE_ID: "1"
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
KAFKA_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://kafka-1:9092"
KAFKA_CONFLUENT_METADATA_SERVER_LISTENERS: "http://0.0.0.0:8090"
KAFKA_CONFLUENT_METADATA_SERVER_ADVERTISED_LISTENERS: "http://kafka-1:8090"
KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka-service"
password="KafkaServicePass123";
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg" # Run kafka-storage random-uuid to generate
volumes:
- kafka1_data:/var/lib/kafka/data
- ./secrets:/etc/kafka/secrets:ro
kafka-2:
<<: *kafka-common
container_name: kafka-2
hostname: kafka-2
ports:
- "9093:9092"
- "8091:8090"
environment:
<<: *kafka-env
KAFKA_NODE_ID: "2"
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
KAFKA_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://kafka-2:9092"
KAFKA_CONFLUENT_METADATA_SERVER_LISTENERS: "http://0.0.0.0:8090"
KAFKA_CONFLUENT_METADATA_SERVER_ADVERTISED_LISTENERS: "http://kafka-2:8090"
KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka-service"
password="KafkaServicePass123";
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"
volumes:
- kafka2_data:/var/lib/kafka/data
- ./secrets:/etc/kafka/secrets:ro
kafka-3:
<<: *kafka-common
container_name: kafka-3
hostname: kafka-3
ports:
- "9094:9092"
- "8092:8090"
environment:
<<: *kafka-env
KAFKA_NODE_ID: "3"
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093"
KAFKA_LISTENERS: "SASL_PLAINTEXT://:9092,CONTROLLER://:9093"
KAFKA_ADVERTISED_LISTENERS: "SASL_PLAINTEXT://kafka-3:9092"
KAFKA_CONFLUENT_METADATA_SERVER_LISTENERS: "http://0.0.0.0:8090"
KAFKA_CONFLUENT_METADATA_SERVER_ADVERTISED_LISTENERS: "http://kafka-3:8090"
KAFKA_LISTENER_NAME_SASL_PLAINTEXT_PLAIN_SASL_JAAS_CONFIG: >
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka-service"
password="KafkaServicePass123";
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"
volumes:
- kafka3_data:/var/lib/kafka/data
- ./secrets:/etc/kafka/secrets:ro
volumes:
ldap_data:
ldap_config:
kafka1_data:
kafka2_data:
kafka3_data:PART 6 β TOKEN KEYPAIR GENERATION (MDS Requirement)
MDS issues JWT tokens. Both the private key (for signing) and public key (for verification) must be on all nodes.
# Generate RSA keypair
mkdir -p ./secrets
# Private key
openssl genrsa -out ./secrets/tokenKeypair.pem 2048
# Public key (for clients verifying MDS tokens)
openssl rsa -in ./secrets/tokenKeypair.pem \
-outform PEM \
-pubout \
-out ./secrets/public.pem
chmod 600 ./secrets/tokenKeypair.pem
chmod 644 ./secrets/public.pemPlace tokenKeypair.pem on every broker. The MDS token path in config should point to this file.
PART 7 β STARTUP ORDER AND HEALTH CHECKS
KRaft has a specific startup order requirement:
# 1. Start LDAP first
docker-compose up -d openldap
# 2. Verify LDAP is healthy
ldapsearch -x -H ldap://localhost:389 \
-D "cn=admin,dc=company,dc=com" -w AdminSecret123 \
-b "dc=company,dc=com" "(objectClass=*)" dn
# 3. Bootstrap LDAP structure (run once)
ldapadd -x -H ldap://localhost:389 \
-D "cn=admin,dc=company,dc=com" -w AdminSecret123 \
-f bootstrap.ldif
# 4. Start all Kafka nodes simultaneously
# (unlike ZK mode, all KRaft nodes discover each other via quorum.voters)
docker-compose up -d kafka-1 kafka-2 kafka-3
# 5. Wait for quorum to form (check logs)
docker-compose logs -f kafka-1 | grep -E "Metadata|quorum|controller"
# 6. Verify cluster metadata
kafka-metadata-quorum \
--bootstrap-server kafka-1:9092 \
--command-config client.properties \
describe --status
# Good output:
# ClusterId: MkU3OEVBNTcwNTJENDM2Qg
# LeaderId: 1
# LeaderEpoch: 1
# HighWatermark: 5
# MaxFollowerLag: 0
# MaxFollowerLagTimeMs: 5
# CurrentVoters: [1,2,3]
# CurrentObservers: []PART 8 β DYNAMIC USER CREATION + RBAC AUTOMATION
Script: Full Onboarding (LDAP + RBAC in one shot)
#!/bin/bash
# onboard-user.sh β creates LDAP user and assigns Kafka RBAC in one workflow
set -euo pipefail
LDAP_HOST="ldap://localhost:389"
LDAP_ADMIN_DN="cn=admin,dc=company,dc=com"
LDAP_ADMIN_PASS="AdminSecret123"
BASE_DN="dc=company,dc=com"
MDS_URL="http://kafka-1:8090"
MDS_ADMIN="kafka-admin"
MDS_ADMIN_PASS="KafkaAdmin123"
CLUSTER_ID="MkU3OEVBNTcwNTJENDM2Qg"
# Args: username firstname lastname email role(developer|devops) env(dev|staging|prod)
USERNAME=$1
FIRSTNAME=$2
LASTNAME=$3
EMAIL=$4
ROLE=$5 # developer | devops
ENV=$6 # dev | staging | prod
# Map role to LDAP group
case $ROLE in
developer) LDAP_GROUP="developers" ;;
devops) LDAP_GROUP="devops" ;;
*) echo "Unknown role: $ROLE"; exit 1 ;;
esac
echo "=== Step 1: Creating LDAP user ==="
HASHED=$(slappasswd -s "Welcome@${USERNAME}123")
UID_NUMBER=$(shuf -i 10000-99999 -n 1)
cat > /tmp/user_${USERNAME}.ldif <<EOF
dn: uid=${USERNAME},ou=People,${BASE_DN}
objectClass: inetOrgPerson
objectClass: posixAccount
objectClass: shadowAccount
cn: ${FIRSTNAME} ${LASTNAME}
sn: ${LASTNAME}
givenName: ${FIRSTNAME}
uid: ${USERNAME}
mail: ${EMAIL}
userPassword: ${HASHED}
uidNumber: ${UID_NUMBER}
gidNumber: 5000
homeDirectory: /home/${USERNAME}
loginShell: /bin/bash
description: ${ROLE} - ${ENV}
EOF
ldapadd -x -H "$LDAP_HOST" -D "$LDAP_ADMIN_DN" -w "$LDAP_ADMIN_PASS" \
-f /tmp/user_${USERNAME}.ldif
echo "β
LDAP user created"
echo "=== Step 2: Adding to group ==="
cat > /tmp/grp_${USERNAME}.ldif <<EOF
dn: cn=${LDAP_GROUP},ou=Groups,${BASE_DN}
changetype: modify
add: member
member: uid=${USERNAME},ou=People,${BASE_DN}
EOF
ldapmodify -x -H "$LDAP_HOST" -D "$LDAP_ADMIN_DN" -w "$LDAP_ADMIN_PASS" \
-f /tmp/grp_${USERNAME}.ldif
echo "β
Added to group: ${LDAP_GROUP}"
echo "=== Step 3: MDS login ==="
confluent login --url "$MDS_URL" \
--username "$MDS_ADMIN" --password "$MDS_ADMIN_PASS"
echo "=== Step 4: Assign Kafka RBAC ==="
# Topic prefix for this env: dev-*, staging-*, prod-*
TOPIC_PREFIX="${ENV}-"
if [ "$ROLE" = "developer" ]; then
# Developers: read + write to their env topics
confluent iam rbac role-binding create \
--principal "User:${USERNAME}" \
--role DeveloperWrite \
--resource "Topic:${TOPIC_PREFIX}" \
--prefix \
--kafka-cluster-id "$CLUSTER_ID"
confluent iam rbac role-binding create \
--principal "User:${USERNAME}" \
--role DeveloperRead \
--resource "Topic:${TOPIC_PREFIX}" \
--prefix \
--kafka-cluster-id "$CLUSTER_ID"
confluent iam rbac role-binding create \
--principal "User:${USERNAME}" \
--role DeveloperRead \
--resource "Group:${USERNAME}-" \
--prefix \
--kafka-cluster-id "$CLUSTER_ID"
elif [ "$ROLE" = "devops" ]; then
# DevOps: full resource ownership on their env
confluent iam rbac role-binding create \
--principal "User:${USERNAME}" \
--role ResourceOwner \
--resource "Topic:${TOPIC_PREFIX}" \
--prefix \
--kafka-cluster-id "$CLUSTER_ID"
confluent iam rbac role-binding create \
--principal "User:${USERNAME}" \
--role ResourceOwner \
--resource "Group:${TOPIC_PREFIX}" \
--prefix \
--kafka-cluster-id "$CLUSTER_ID"
# DevOps can also view cluster metadata
confluent iam rbac role-binding create \
--principal "User:${USERNAME}" \
--role Operator \
--kafka-cluster-id "$CLUSTER_ID"
fi
echo ""
echo "β
User ${USERNAME} fully onboarded"
echo " LDAP group: ${LDAP_GROUP}"
echo " Kafka role: ${ROLE} on env: ${ENV}"
echo " Temp password: Welcome@${USERNAME}123 (force change on first login)"
# Cleanup
rm -f /tmp/user_${USERNAME}.ldif /tmp/grp_${USERNAME}.ldif# Usage examples
./onboard-user.sh alice Alice Smith alice@company.com developer dev
./onboard-user.sh bob Bob Jones bob@company.com devops staging
./onboard-user.sh carol Carol White carol@company.com developer prodPART 9 β GROUP-BASED RBAC (Preferred Approach)
Instead of binding every user individually, bind the LDAP group once. All members inherit access automatically.
# Get cluster ID
CLUSTER_ID=$(kafka-cluster cluster-id \
--bootstrap-server kafka-1:9092 \
--command-config client.properties)
# ββ Developers group ββββββββββββββββββββββββββββββββββ
# Read + Write all dev- topics
confluent iam rbac role-binding create \
--principal Group:developers \
--role DeveloperWrite \
--resource "Topic:dev-" \
--prefix \
--kafka-cluster-id $CLUSTER_ID
confluent iam rbac role-binding create \
--principal Group:developers \
--role DeveloperRead \
--resource "Topic:dev-" \
--prefix \
--kafka-cluster-id $CLUSTER_ID
# Read schemas on dev subjects
confluent iam rbac role-binding create \
--principal Group:developers \
--role DeveloperRead \
--resource "Subject:dev-" \
--prefix \
--schema-registry-cluster-id $SR_CLUSTER_ID
# ββ DevOps group ββββββββββββββββββββββββββββββββββββββ
# Full ownership on all topics
confluent iam rbac role-binding create \
--principal Group:devops \
--role ResourceOwner \
--resource "Topic:*" \
--prefix \
--kafka-cluster-id $CLUSTER_ID
# Cluster operations (connectors, health, configs)
confluent iam rbac role-binding create \
--principal Group:devops \
--role Operator \
--kafka-cluster-id $CLUSTER_ID
# ββ kafka-admins group ββββββββββββββββββββββββββββββββ
confluent iam rbac role-binding create \
--principal Group:kafka-admins \
--role SystemAdmin \
--kafka-cluster-id $CLUSTER_IDPART 10 β KAFKA CLI WITH LDAP CREDENTIALS
client.properties (for CLI tools)
# For developers
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="alice" \
password="TempPass@123";Day-to-day CLI operations
# Produce to a topic
kafka-console-producer \
--bootstrap-server kafka-1:9092 \
--topic dev-orders \
--producer.config /etc/kafka/client.properties
# Consume from a topic
kafka-console-consumer \
--bootstrap-server kafka-1:9092 \
--topic dev-orders \
--group alice-dev-group \
--from-beginning \
--consumer.config /etc/kafka/client.properties
# List topics (as developer)
kafka-topics \
--bootstrap-server kafka-1:9092 \
--command-config /etc/kafka/client.properties \
--list
# Create a topic (devops only β needs ResourceOwner or DeveloperManage)
kafka-topics \
--bootstrap-server kafka-1:9092 \
--command-config /etc/kafka/devops-client.properties \
--create \
--topic dev-payments \
--partitions 6 \
--replication-factor 3
# Describe consumer group lag
kafka-consumer-groups \
--bootstrap-server kafka-1:9092 \
--command-config /etc/kafka/client.properties \
--describe \
--group alice-dev-groupPART 11 β KRAFT-SPECIFIC MONITORING AND OPS
Check KRaft Quorum Status
# Quorum status (no auth needed for metadata commands if using admin config)
kafka-metadata-quorum \
--bootstrap-server kafka-1:9092 \
--command-config admin-client.properties \
describe --status
# Quorum replication status
kafka-metadata-quorum \
--bootstrap-server kafka-1:9092 \
--command-config admin-client.properties \
describe --replication
# View metadata log (KRaft replaces ZK znodes with this)
kafka-metadata-shell \
--snapshot /var/lib/kafka/metadata/__cluster_metadata-0/00000000000000000000.logCheck MDS Health
# MDS health endpoint
curl http://kafka-1:8090/kafka/v3/clusters
# MDS token endpoint (verify LDAP auth is working)
curl -u alice:TempPass@123 \
http://kafka-1:8090/security/1.0/authenticate
# Response includes bearer token β if LDAP auth fails, this returns 401LDAP Sync Verification
# Force MDS to refresh its LDAP group cache
curl -X POST \
-H "Authorization: Bearer $(get-mds-token)" \
http://kafka-1:8090/security/1.0/ldap/groups/refresh
# List groups MDS has loaded from LDAP
curl \
-H "Authorization: Bearer $(get-mds-token)" \
http://kafka-1:8090/security/1.0/ldap/groupsPART 12 β TROUBLESHOOTING KRAFT + LDAP
Common Issues
| Problem | Kafka Error | Root Cause | Fix |
|---|---|---|---|
| Cluster wonβt start | No leader elected | Mismatched CLUSTER_ID across nodes | Re-format all nodes with same ID |
| Quorum wonβt form | Unable to find leader | controller.quorum.voters hostnames wrong | Verify DNS resolution between nodes |
| LDAP auth fails | SASL authentication failed | Callback handler class not on classpath | Verify CP Server (not community) image |
| RBAC not applying | Authorization failed | MDS didnβt load groups yet | Wait for ldap.refresh.interval.ms or trigger manual refresh |
| MDS 401 on token | Token verification failed | tokenKeypair.pem mismatch between nodes | Distribute same keypair to all brokers |
| User added to group, no access | Groups stale in MDS | Cache not expired yet | Reduce refresh interval or restart |
node.id not set | Broker fails to start | Missing mandatory KRaft config | Add KAFKA_NODE_ID to all nodes |
Log Locations to Watch
# KRaft controller election events
docker logs kafka-1 2>&1 | grep -E "Raft|leader|epoch|quorum"
# LDAP bind attempts (auth events)
docker logs kafka-1 2>&1 | grep -E "LDAP|ldap|authenticate|SASL"
# MDS startup and group load
docker logs kafka-1 2>&1 | grep -E "MDS|metadata|group.*loaded"
# Authorization decisions
docker logs kafka-1 2>&1 | grep -E "Authoriz|RBAC|deny|allow"Reset a Node (KRaft way β no ZK to wipe)
# Stop node
docker-compose stop kafka-2
# Wipe data and metadata logs
docker exec kafka-2 rm -rf /var/lib/kafka/data/* /var/lib/kafka/metadata/*
# Re-format with SAME cluster ID
docker exec kafka-2 kafka-storage format \
-t MkU3OEVBNTcwNTJENDM2Qg \
-c /etc/kafka/server.properties
# Restart
docker-compose start kafka-2PART 13 β KRaft + LDAP QUICK REFERENCE
Startup checklist:
1. Generate cluster ID: kafka-storage random-uuid
2. Format all nodes: kafka-storage format -t <id> -c server.properties
3. Generate MDS keypair: openssl genrsa -out tokenKeypair.pem 2048
4. Bootstrap LDAP: ldapadd -f bootstrap.ldif
5. Start nodes (all at once, no ZK ordering)
6. Verify quorum: kafka-metadata-quorum describe --status
7. Login to MDS: confluent login --url http://kafka-1:8090
8. Create RBAC bindings: confluent iam rbac role-binding create ...
KRaft-only mandatory configs vs ZK mode:
process.roles = broker | controller | broker,controller
node.id = unique integer per node (replaces broker.id)
controller.quorum.voters = id@host:port,id@host:port,...
metadata.log.dir = path for Raft log (separate from data)
CLUSTER_ID = must be identical across all nodes
What stayed the same:
- All ldap.* config keys (auth)
- All confluent.metadata.server.ldap.* keys (MDS/RBAC)
- All RBAC role binding commands
- Client SASL/PLAIN config
- LDAP user/group structure in OpenLDAP
The single biggest operational difference between ZooKeeper and KRaft is the cluster ID lifecycle β in ZK mode it was automatic; in KRaft you generate it once, format every node with it, and it never changes for the life of the cluster.