Authenticate Kong Event Gateway connections to Kafka using mTLS

TL;DR

Create a backend cluster with tls.enabled: true and supply a CA bundle and a client certificate and key in tls.client_identity.

Prerequisites

Install kafkactl. You’ll need it to interact with Kafka clusters.

If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.

  1. The following Konnect items are required to complete this tutorial:
    • Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
  2. Set the personal access token as an environment variable:

    export KONNECT_TOKEN='YOUR KONNECT TOKEN'
    

In this guide you’ll configure Event Gateway to connect to a secured Kafka cluster by presenting a mutual TLS client certificate.

 
flowchart LR
    C[Kafka Client]
    subgraph EG [" Event Gateway "]
        VC2[mtls virtual cluster]
    end
    subgraph K [Kafka Cluster]
        L2["SSL :9088"]
    end
    C -->|anonymous| VC2
    VC2 -->|mTLS| L2
  

Generate TLS certificates

Generate a CA, per-broker JKS keystores, and a PEM client certificate for Event Gateway. This requires OpenSSL and keytool (included in any JDK installation).

  1. Create the output directory and generate a self-signed CA certificate:

     mkdir -p certs
     openssl req -new -x509 -nodes -keyout certs/ca.key -out certs/ca.crt \
       -days 365 -subj "/CN=Kafka-CA"
    
  2. Create the credentials file used as the password for all keystores, then import the CA into a shared truststore that all brokers will reference:

     echo "changeit" > certs/keystore-credentials
     keytool -import -trustcacerts -alias CARoot \
       -file certs/ca.crt -keystore certs/truststore.jks \
       -storepass changeit -noprompt
    
  3. For each broker, generate a key pair, sign it with the CA, and import both the CA certificate and the signed broker certificate into the broker’s keystore:

     for broker in kafka1 kafka2 kafka3; do
       keytool -genkeypair -alias "$broker" \
         -keyalg RSA -keysize 2048 -dname "CN=$broker" \
         -keystore "certs/$broker.keystore.jks" \
         -storepass changeit -keypass changeit -validity 365
    
       keytool -certreq -alias "$broker" \
         -keystore "certs/$broker.keystore.jks" \
         -storepass changeit -file "certs/$broker.csr"
       printf "subjectAltName=DNS:%s" "$broker" > "certs/$broker.ext"
       openssl x509 -req -in "certs/$broker.csr" \
         -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial \
         -out "certs/$broker.crt" -days 365 -extfile "certs/$broker.ext"
    
       keytool -import -trustcacerts -alias CARoot \
         -file certs/ca.crt -keystore "certs/$broker.keystore.jks" \
         -storepass changeit -noprompt
       keytool -import -alias "$broker" \
         -file "certs/$broker.crt" -keystore "certs/$broker.keystore.jks" \
         -storepass changeit -noprompt
     done
    
  4. Generate a PEM client certificate for Event Gateway:

     openssl genrsa -out certs/client.key 2048
     openssl req -new -key certs/client.key -out certs/client.csr -subj "/CN=event-gateway"
     openssl x509 -req -in certs/client.csr -CA certs/ca.crt -CAkey certs/ca.key \
       -CAcreateserial -out certs/client.crt -days 365
    

Start the secured Kafka cluster

Create the Docker Compose file:

cat <<'EOF' > docker-compose.yaml
name: kafka_cluster

networks:
  kafka:
    name: kafka_event_gateway

services:
  kafka1:
    image: apache/kafka:4.2.0
    networks:
      - kafka
    container_name: kafka1
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094,SSL://0.0.0.0:9088
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094,SSL://kafka1:9088
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
      KAFKA_SSL_KEY_CREDENTIALS: keystore-credentials
      KAFKA_SSL_KEYSTORE_CREDENTIALS: keystore-credentials
      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: keystore-credentials
      KAFKA_SSL_CLIENT_AUTH: required
    volumes:
      - ./certs/kafka1.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
      - ./certs/truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
      - ./certs/keystore-credentials:/etc/kafka/secrets/keystore-credentials

  kafka2:
    image: apache/kafka:4.2.0
    networks:
      - kafka
    container_name: kafka2
    ports:
      - "9095:9095"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095,SSL://0.0.0.0:9088
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095,SSL://kafka2:9088
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
      KAFKA_SSL_KEY_CREDENTIALS: keystore-credentials
      KAFKA_SSL_KEYSTORE_CREDENTIALS: keystore-credentials
      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: keystore-credentials
      KAFKA_SSL_CLIENT_AUTH: required
    volumes:
      - ./certs/kafka2.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
      - ./certs/truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
      - ./certs/keystore-credentials:/etc/kafka/secrets/keystore-credentials

  kafka3:
    image: apache/kafka:4.2.0
    networks:
      - kafka
    container_name: kafka3
    ports:
      - "9096:9096"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096,SSL://0.0.0.0:9088
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096,SSL://kafka3:9088
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,SSL:SSL
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      KAFKA_SSL_KEYSTORE_FILENAME: kafka.keystore.jks
      KAFKA_SSL_KEY_CREDENTIALS: keystore-credentials
      KAFKA_SSL_KEYSTORE_CREDENTIALS: keystore-credentials
      KAFKA_SSL_TRUSTSTORE_FILENAME: kafka.truststore.jks
      KAFKA_SSL_TRUSTSTORE_CREDENTIALS: keystore-credentials
      KAFKA_SSL_CLIENT_AUTH: required
    volumes:
      - ./certs/kafka3.keystore.jks:/etc/kafka/secrets/kafka.keystore.jks
      - ./certs/truststore.jks:/etc/kafka/secrets/kafka.truststore.jks
      - ./certs/keystore-credentials:/etc/kafka/secrets/keystore-credentials

EOF

The broker exposes an SSL listener on port 9088 in the Docker network for Event Gateway mTLS connections (client certificate required), and a PLAINTEXT listener on ports 9094/9095/9096 for direct local access.

Start the cluster:

docker compose up -d

Create an Event Gateway control plane and data plane

Run the quickstart script to provision a local data plane and configure your environment:

curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway

Copy the exported variable into your terminal:

export EVENT_GATEWAY_ID=your-gateway-id

This quickstart script is meant for demo purposes only, therefore it runs locally with most default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.

Configure kafkactl

Create a kafkactl.yaml config file with contexts for direct Kafka access and the mTLS virtual cluster:

cat <<EOF > kafkactl.yaml
contexts:
  direct:
    brokers:
      - localhost:9094
      - localhost:9095
      - localhost:9096
  mtls-vc:
    brokers:
      - localhost:19096
EOF

Create a test topic using the direct context:

kafkactl -C kafkactl.yaml --context direct create topic orders

Create the backend cluster

The ca_bundle and client_identity.certificate fields accept PEM-encoded strings. The client_identity.key field requires a base64-encoded value.

Build the request body with TLS enabled:

jq -n \
  --rawfile ca_bundle certs/ca.crt \
  --rawfile certificate certs/client.crt \
  --arg key "$(cat certs/client.key | base64)" \
  '{
    "name": "mtls_backend_cluster",
    "bootstrap_servers": ["kafka1:9088", "kafka2:9088", "kafka3:9088"],
    "authentication": {"type": "anonymous"},
    "insecure_allow_anonymous_virtual_cluster_auth": true,
    "tls": {
      "enabled": true,
      "ca_bundle": $ca_bundle,
      "client_identity": {"certificate": $certificate, "key": $key}
    }
  }' > mtls_backend_cluster.json

Then, create the backend cluster:

MTLS_BACKEND_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json "$(cat mtls_backend_cluster.json)" | jq -r ".id"
)

The ca_bundle lets Event Gateway verify the broker’s certificate. The client_identity holds the certificate and key that Event Gateway presents to Kafka during the TLS handshake.

Create a virtual cluster

Run the following command to create a virtual cluster with anonymous authentication:

MTLS_VC_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "mtls_vc",
       "destination": {
         "id": "'$MTLS_BACKEND_CLUSTER_ID'"
       },
       "dns_label": "mtls-vc",
       "authentication": [
         {
           "type": "anonymous"
         }
       ],
       "acl_mode": "passthrough"
     }' | jq -r ".id"
)

Create a listener

Run the following command to create a listener:

MTLS_LISTENER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "mtls_listener",
       "addresses": [
         "0.0.0.0"
       ],
       "ports": [
         "19096-19099"
       ]
     }' | jq -r ".id"
)

Create a listener policy

Add a Forward to Virtual Cluster policy, which will forward requests based on a defined mapping to our virtual cluster:

curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$MTLS_LISTENER_ID/policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "forward_to_virtual_cluster",
       "name": "forward_to_mtls_vc",
       "config": {
         "type": "port_mapping",
         "advertised_host": "localhost",
         "destination": {
           "id": "'$MTLS_VC_ID'"
         }
       }
     }'

Validate

List the topics through the mtls-vc virtual cluster:

kafkactl -C kafkactl.yaml --context mtls-vc list topics
TOPIC     PARTITIONS     REPLICATION FACTOR
orders    1              1

Event Gateway completed the mTLS handshake with Kafka using the client certificate and forwarded the metadata request successfully.

Cleanup

When you’re done experimenting with this example, clean up the resources:

  1. If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.

  2. Stop and remove the containers:

    docker-compose down
    

This will stop all services and remove the containers, but preserve your configuration files for future use.

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!