If you need to encrypt and decrypt whole Kafka messages instead of specific fields, use the Decrypt and Encrypt policies. See Encrypt and decrypt Kafka messages for a complete how-to guide.
Encrypt and decrypt Kafka fields dynamically in message values with Kong Event Gateway
Generate a key and create a static key entity, then create field encryption and field decryption policies with path expressions to enable message encryption and decryption.
Prerequisites
Install kafkactl
Install kafkactl. You’ll need it to interact with Kafka clusters.
Start a local Kafka cluster
Start a Docker Compose cluster with multiple Kafka services.
First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:
cat <<EOF > docker-compose.yaml
name: kafka_cluster
networks:
kafka:
name: kafka_event_gateway
services:
kafka1:
image: apache/kafka:4.3.0
networks:
- kafka
container_name: kafka1
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka2:
image: apache/kafka:4.3.0
networks:
- kafka
container_name: kafka2
ports:
- "9095:9095"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka3:
image: apache/kafka:4.3.0
networks:
- kafka
container_name: kafka3
ports:
- "9096:9096"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
schema-registry:
image: confluentinc/cp-schema-registry:8.2.1
networks:
- kafka
container_name: schema-registry
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
EOFNow, let’s start the local setup:
docker compose up -dKong Konnect
If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.
- The following Konnect items are required to complete this tutorial:
- Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
-
Set the personal access token as an environment variable:
export KONNECT_TOKEN='YOUR KONNECT TOKEN'
Kong Event Gateway running
Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:
curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gatewayThis sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:
export EVENT_GATEWAY_ID=your-gateway-idCopy and paste the command with your Event Gateway ID into your terminal to configure your session.
This quickstart script is meant for demo purposes only, therefore it runs locally with most default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.
Create an auth server in Kong Identity
Before you can configure a dynamic field decryption based on authentication, you must first create an auth server in Kong Identity. We recommend creating different auth servers for different environments or subsidiaries. The auth server name is unique per each organization and each Konnect region.
Create an auth server using the /v1/auth-servers endpoint:
_response=$(curl -X POST "https://us.api.konghq.com/v1/auth-servers" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN"\
-H "Content-Type: application/json" \
--json '{
"name": "Appointments Dev",
"audience": "http://myhttpbin.dev",
"description": "Auth server for the Appointment dev environment"
}')Export the env variables:
export AUTH_SERVER_ID=$(echo "$_response" | jq -r ".id")
export ISSUER_URL=$(echo "$_response" | jq -r ".issuer")Create a client in the auth server
The client is the machine-to-machine credential. In this tutorial, Konnect will autogenerate the client ID and secret, but you can alternatively specify one yourself.
Configure the client using the /v1/auth-servers/$AUTH_SERVER_ID/clients endpoint:
_response=$(curl -X POST "https://us.api.konghq.com/v1/auth-servers/$AUTH_SERVER_ID/clients" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN"\
-H "Content-Type: application/json" \
--json '{
"name": "Client",
"grant_types": [
"client_credentials"
],
"allow_all_scopes": true,
"access_token_duration": 3600,
"id_token_duration": 3600,
"response_types": [
"id_token",
"token"
]
}')Export the env variables:
export CLIENT_SECRET=$(echo "$_response" | jq -r ".client_secret")
export CLIENT_ID=$(echo "$_response" | jq -r ".id")Add a backend cluster
Run the following command to create a new backend cluster:
BACKEND_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "default_backend_cluster",
"bootstrap_servers": [
"kafka1:9092",
"kafka2:9092",
"kafka3:9092"
],
"authentication": {
"type": "anonymous"
},
"insecure_allow_anonymous_virtual_cluster_auth": true,
"tls": {
"enabled": false
}
}' | jq -r ".id"
)Add a virtual cluster
Run the following command to create a new virtual cluster associated with our backend cluster:
VIRTUAL_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "example_virtual_cluster",
"destination": {
"id": "'$BACKEND_CLUSTER_ID'"
},
"dns_label": "vcluster-1",
"authentication": [
{
"type": "anonymous"
},
{
"type": "oauth_bearer",
"mediation": "terminate",
"jwks": {
"endpoint": "'$ISSUER_URL'/.well-known/jwks"
}
}
],
"acl_mode": "passthrough"
}' | jq -r ".id"
)Add a listener
A listener represents hostname-port or IP-port combinations that connect to TCP sockets. In this example, we’re going to use port mapping, so we need to expose a range of ports.
Run the following command to create a new listener:
LISTENER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "example_listener",
"addresses": [
"0.0.0.0"
],
"ports": [
"19092-19095"
]
}' | jq -r ".id"
)Add a listener policy
The listener needs a policy to tell it how to process requests and what to do with them. In this example, we’re going to use the Forward to Virtual Cluster policy, which will forward requests based on a defined mapping to our virtual cluster.
Run the following command to add the listener policy:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "forward_to_virtual_cluster",
"name": "forward",
"config": {
"type": "port_mapping",
"advertised_host": "localhost",
"destination": {
"id": "'$VIRTUAL_CLUSTER_ID'"
}
}
}'For demo purposes, we’re using port mapping, which assigns each Kafka broker to a dedicated port on the Event Gateway. In production, we recommend using SNI routing instead.
Generate a key
Use OpenSSL to generate the key that will be used to encrypt and decrypt messages:
export MY_KEY="$(openssl rand -base64 32)"Add a static key
Run the following command to create a new static key named my-key with the key we generated:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/static-keys" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "my-key",
"value": "'$MY_KEY'"
}'Create a Schema Validation produce policy
Create a Schema Validation policy that validates that all produced values are JSON encoded.
PRODUCE_SCHEMA_VALIDATION_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "schema_validation",
"name": "produce_validate_json",
"config": {
"type": "json",
"value_validation_action": "reject"
}
}' | jq -r ".id"
)The value_validation_action: reject setting ensures that the entire batch containing an invalid message is rejected, and the producer receives an error.
Alternatively, you can use mark, which passes the message to the broker but adds a kong/sverr-value header to flag it as invalid.
Add a field encryption policy
Use the following command to create a field encryption policy to enable encryption of one field in the JSON value:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "encrypt-fields-static-key",
"parent_policy_id": "'$PRODUCE_SCHEMA_VALIDATION_ID'",
"type": "encrypt_fields",
"config": {
"failure_mode": "reject",
"encrypt_fields": [
{
"paths": [
{
"match": "personal.ssn"
}
],
"encryption_key": {
"type": "static",
"key": {
"name": "my-key"
}
}
}
]
}
}'Create a Schema Validation consume policy
Create a Schema Validation policy that validates that all consumed values are JSON encoded.
CONSUME_SCHEMA_VALIDATION_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "schema_validation",
"name": "consume_validate_json",
"config": {
"type": "json",
"value_validation_action": "mark"
}
}' | jq -r ".id"
)The value_validation_action: mark passes the message to the broker but adds a kong/sverr-value header to flag it as invalid.
Add a field decryption policy
This field decryption policy is where Event Gateway will make a dynamic decision on which fields to decrypt based on the authenticated user:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "decrypt-fields-static-key",
"parent_policy_id": "'$CONSUME_SCHEMA_VALIDATION_ID'",
"type": "decrypt_fields",
"config": {
"failure_mode": "error",
"key_sources": [
{
"type": "static"
}
],
"decrypt_fields": {
"paths": "context.auth.principal.name == \"'$CLIENT_ID'\" ? [\"personal.ssn\"] : []"
}
}
}'Set up kafkactl to use OAuth
This step requires a
kafkactlversion >= 5.17.0. To check your version, runkafkactl version.
Note that this script is for demo purposes only and hard-codes client ID and client secret. For production, we recommend securing sensitive data.
kafkactl will generate tokens using a script. Let’s create the script:
cat <<EOF > get-oauth-token.sh
#!/bin/bash
curl -s --fail -X POST "$ISSUER_URL/oauth/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials" \
-d "client_id=$CLIENT_ID" \
-d "client_secret=$CLIENT_SECRET" | jq -r '{"token": .access_token}'
EOF
chmod u+x get-oauth-token.shNext, create a kafkactl configuration with both non-authenticated and authenticated access:
cat <<EOF > kafkactl.yaml
contexts:
direct:
brokers:
- localhost:9095
- localhost:9096
- localhost:9094
vc:
brokers:
- localhost:19092
vc-oauth:
sasl:
enabled: true
mechanism: oauth
tokenprovider:
plugin: generic
options:
script: ./get-oauth-token.sh
args: []
brokers:
- localhost:19092
EOFValidate
Let’s check that the encryption/decryption works.
First, create a topic using the direct context, which is a direct connection to our Kafka cluster:
kafkactl -C kafkactl.yaml --context direct create topic my-test-topicProduce a message using the vc context which should encrypt the field:
kafkactl -C kafkactl.yaml --context vc produce my-test-topic --value='{"personal": {"ssn": "100-00-00001"}}'You should see the following response:
message produced (partition=0 offset=0)Now let’s verify that the message was encrypted by consuming the message directly:
kafkactl -C kafkactl.yaml --context direct consume my-test-topic --exit --output json --from-beginning --print-headersYou should see the following response:
{
"Partition": 0,
"Offset": 0,
"Headers": {
"kong/enc": "\u0000\u0004\u0000-static://<static-key-id>"
},
"Value": "{\"personal\":{\"ssn\":\"AHry69Jl4oJzafOlu/xOjVa37hpfYTAVXoAolj94NoBQSKz7dkEF/gg=\"}}"
}The field encryption policy appends a kong/enc header to each message. This header identifies the encryption key by its ID.
Now let’s verify that the field decryption policy works by consuming the message through the virtual cluster with OAuth:
kafkactl -C kafkactl.yaml --context vc-oauth consume my-test-topic --from-beginning --exitThe output should look like this, with the value decrypted:
{"personal": {"ssn": "100-00-00001"}}We can then see that our field remains encrypted if we aren’t authenticated:
kafkactl -C kafkactl.yaml --context vc consume my-test-topic --from-beginning --exitThe output should look like this, with the value obfuscated by encryption:
{"personal": {"ssn": "..."}}Cleanup
Clean up Kong Event Gateway resources
When you’re done experimenting with this example, clean up the resources:
-
If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.
-
Stop and remove the containers:
docker-compose down
This will stop all services and remove the containers, but preserve your configuration files for future use.