Encrypt and decrypt Kafka messages with Kong Event Gateway

TL;DR

Generate a key and create a static key entity, then create Encrypt and Decrypt policies to enable message encryption and decryption.

Prerequisites

Install kafkactl. You’ll need it to interact with Kafka clusters.

Start a Docker Compose cluster with multiple Kafka services.

First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:

cat <<EOF > docker-compose.yaml
name: kafka_cluster

networks:
  kafka:
    name: kafka_event_gateway

services:
  kafka1:
    image: apache/kafka:3.7.0
    networks:
      - kafka
    container_name: kafka1
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka2:
    image: apache/kafka:3.7.0
    networks:
      - kafka
    container_name: kafka2
    ports:
      - "9095:9095"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka3:
    image: apache/kafka:3.7.0
    networks:
      - kafka
    container_name: kafka3
    ports:
      - "9096:9096"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
EOF

Now, let’s start the local setup:

docker compose up -d

If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.

  1. The following Konnect items are required to complete this tutorial:
    • Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
  2. Set the personal access token as an environment variable:

    export KONNECT_TOKEN='YOUR KONNECT TOKEN'
    

Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:

curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway

This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:

export EVENT_GATEWAY_ID=your-gateway-id

Copy and paste this into your terminal to configure your session.

This quickstart script is meant for demo purposes only, therefore it runs locally with default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.

Configure a Kafka cluster

Now that we’ve configured the proxy, let’s make sure the Kafka cluster is ready.

In your local environment, set up the kafkactl.yaml config file for your Kafka cluster:

cat <<EOF > kafkactl.yaml
contexts:
  direct:
    brokers:
      - localhost:9095
      - localhost:9096
      - localhost:9094
  vc:
    brokers:
      - localhost:19092
EOF

Add a backend cluster

Run the following command to create a new backend cluster:

BACKEND_CLUSTER_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "default_backend_cluster",
       "bootstrap_servers": [
         "kafka1:9092",
         "kafka2:9092",
         "kafka3:9092"
       ],
       "authentication": {
         "type": "anonymous"
       },
       "insecure_allow_anonymous_virtual_cluster_auth": true,
       "tls": {
         "enabled": false
       }
     }' | jq -r ".id")

Add a virtual cluster

Run the following command to create a new virtual cluster associated with our backend cluster:

VIRTUAL_CLUSTER_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "example_virtual_cluster",
       "destination": {
         "id": "'$BACKEND_CLUSTER_ID'"
       },
       "dns_label": "vcluster-1",
       "authentication": [
         {
           "type": "anonymous"
         }
       ],
       "acl_mode": "passthrough"
     }' | jq -r ".id")

Add a listener

A listener represents hostname-port or IP-port combinations that connect to TCP sockets. In this example, we’re going to use port mapping, so we need to expose a range of ports.

Run the following command to create a new listener:

LISTENER_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "example_listener",
       "addresses": [
         "0.0.0.0"
       ],
       "ports": [
         "19092-19095"
       ]
     }' | jq -r ".id")

Add a listener policy

The listener needs a policy to tell it how to process requests and what to do with them. In this example, we’re going to use the Forward to Virtual Cluster policy, which will forward requests based on a defined mapping to our virtual cluster.

Run the following command to add the listener policy:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "forward_to_virtual_cluster",
       "name": "forward",
       "config": {
         "type": "port_mapping",
         "advertised_host": "localhost",
         "destination": {
           "id": "'$VIRTUAL_CLUSTER_ID'"
         }
       }
     }'

For demo purposes, we’re using port mapping, which assigns each Kafka broker to a dedicated port on the Event Gateway. In production, we recommend using SNI routing instead.

Generate a key

Use OpenSSL to generate the key that will be used to encrypt and decrypt messages:

export MY_KEY=$(openssl rand -base64 32)

Add a static key

Run the following command to create a new static key named my-key with the key we generated:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/static-keys" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "my-key",
       "value": "'$MY_KEY'"
     }'

Add an Encrypt policy

Use the following command to create an Encrypt policy to enable encryption of messages:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "encrypt-static-key",
       "type": "encrypt",
       "config": {
         "failure_mode": "passthrough",
         "part_of_record": [
           "value"
         ],
         "encryption_key": {
           "type": "static",
           "key": {
             "name": "my-key"
           }
         }
       }
     }'

Add a Decrypt policy

Use the following command to create a Decrypt policy to enable decryption of messages:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "decrypt-static-key",
       "type": "decrypt",
       "config": {
         "failure_mode": "passthrough",
         "part_of_record": [
           "value"
         ],
         "key_sources": [
           {
             "type": "static"
           }
         ]
       }
     }'

Validate

Let’s check that the encryption/decryption works. First, create a topic using the direct context, which is a direct connection to our Kafka cluster:

kafkactl -C kafkactl.yaml --context direct create topic my-test-topic

Produce a message using the vc context which should encrypt the message:

kafkactl -C kafkactl.yaml --context vc produce my-test-topic --value="Hello World"

You should see the following response:

message produced (partition=0	offset=0)

Now let’s verify that the message was encrypted, by consuming the message directly.

kafkactl -C kafkactl.yaml --context direct consume my-test-topic --exit --output json --from-beginning --print-headers

You should see the following response:

{
	"Partition": 0,
	"Offset": 0,
	"Headers": {
		"kong/enc": "\u0000\u0001\u0000-static://<static-key-id>"
	},
	"Value": "deJ415liQWUEP8j33Yrb/7knuwRzHrHNRDQkkePePZ18MShhlY9A++ZFH/9uaHRb+Q=="
}

The Encrypt policy appends a kong/enc header to each message. This header identifies the encryption key by its ID.

Now let’s verify that the Decrypt policy works by consuming the message through the virtual cluster.

kafkactl -C kafkactl.yaml --context vc consume my-test-topic --from-beginning  --exit

The output should contain your new header:

Hello World

Cleanup

When you’re done experimenting with this example, clean up the resources:

  1. If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.

  2. Stop and remove the containers:

    docker-compose down
    

This will stop all services and remove the containers, but preserve your configuration files for future use.

Something wrong?

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!