kafkactl -C kafkactl.yaml --context direct create topic app_logs
Filter Kafka records by classification headers
- Create a Schema Validation policy (produce phase) to parse JSON records.
- Nest a Modify Headers policy to classify records with a header based on content.
- Create a Skip Records policy (consume phase) to filter records based on the header and principal name.
Prerequisites
Install kafkactl
Install kafkactl. You’ll need it to interact with Kafka clusters.
Define a context for kafkactl
Let’s define a context we can use to create Kafka topics:
cat <<EOF > kafkactl.yaml
contexts:
direct:
brokers:
- localhost:9095
- localhost:9096
- localhost:9094
EOF
Start a local Kafka cluster
Start a Docker Compose cluster with multiple Kafka services.
First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:
cat <<EOF > docker-compose.yaml
name: kafka_cluster
networks:
kafka:
name: kafka_event_gateway
services:
kafka1:
image: apache/kafka:4.1.1
networks:
- kafka
container_name: kafka1
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka2:
image: apache/kafka:4.1.1
networks:
- kafka
container_name: kafka2
ports:
- "9095:9095"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka3:
image: apache/kafka:4.1.1
networks:
- kafka
container_name: kafka3
ports:
- "9096:9096"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
EOF
Now, let’s start the local setup:
docker compose up -d
Kong Konnect
If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.
- The following Konnect items are required to complete this tutorial:
- Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
-
Set the personal access token as an environment variable:
export KONNECT_TOKEN='YOUR KONNECT TOKEN'Copied!
Kong Event Gateway running
Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:
curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway
This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:
export EVENT_GATEWAY_ID=your-gateway-id
Copy and paste the command with your Event Gateway ID into your terminal to configure your session.
This quickstart script is meant for demo purposes only, therefore it runs locally with most default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.
Overview
In this guide, you’ll learn how to classify Kafka records at produce time and filter them at consume time based on user identity.
We’ll use a logging scenario where an app_logs topic contains log entries with different severity levels, aimed at two different groups of users:
- The SRE team needs debug and trace logs, which are verbose and useful for troubleshooting issues.
- Regular developers only need info, warn, and error logs.
The approach uses two policies:
-
Produce phase: A Schema Validation policy parses JSON records, and a nested Modify Headers policy adds an
x-internal: trueheader to debug and trace logs. - Consume phase: A Skip Records policy filters out internal logs for users who aren’t on the SRE team.
Here’s how the data flows through the system:
flowchart LR
P[Producer] --> SV
subgraph produce [Event Gateway Produce policy chain]
SV[Schema
Validation
Parse JSON] --> MH{Modify Headers
level
= debug/trace?}
MH -->|Yes| H1[Add
x-internal: true]
MH -->|No| H2[No header
added]
end
subgraph consume [Event Gateway Consume policy chain]
SR{Skip Records
x-internal = true
AND
user ≠ sre_user?}
SR -->|Yes| DROP[Record
skipped]
SR -->|No| C[Send to
consumer]
end
H1 --> K[Kafka
Broker]
H2 --> K
K --> SR
C --> CO[Consumer]
Performance tip: Classifying records at produce time is more efficient than at consume time. Parsing JSON once during production avoids repeated deserialization for each consumer group.
Create a Kafka topic
Create an app_logs topic in the Kafka cluster:
Create a backend cluster
Use the following command to create a backend cluster that connects to the Kafka servers you set up:
BACKEND_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "backend_cluster",
"bootstrap_servers": [
"kafka1:9092",
"kafka2:9092",
"kafka3:9092"
],
"authentication": {
"type": "anonymous"
},
"tls": {
"enabled": false
}
}' | jq -r ".id"
)
Create a virtual cluster
Create a virtual cluster with two users (principals):
-
sre_user: Can see all logs including debug and trace -
dev_user: Only sees info, warn, and error logs
VIRTUAL_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "logs_vc",
"destination": {
"id": "'$BACKEND_CLUSTER_ID'"
},
"dns_label": "logs",
"authentication": [
{
"type": "sasl_plain",
"mediation": "terminate",
"principals": [
{
"username": "sre_user",
"password": "sre_password"
},
{
"username": "dev_user",
"password": "dev_password"
}
]
}
],
"acl_mode": "passthrough"
}' | jq -r ".id"
)
Create a listener with a forwarding policy
Create a listener to accept connections:
LISTENER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "logs_listener",
"addresses": [
"0.0.0.0"
],
"ports": [
"19092-19095"
]
}' | jq -r ".id"
)
Create a port mapping policy to forward traffic to the virtual cluster:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "forward_to_virtual_cluster",
"name": "forward_to_logs_vc",
"config": {
"type": "port_mapping",
"advertised_host": "localhost",
"destination": {
"id": "'$VIRTUAL_CLUSTER_ID'"
}
}
}'
Create a Schema Validation policy
Create a Schema Validation policy that parses JSON records during the produce phase. This allows nested policies to access record content:
SCHEMA_VALIDATION_POLICY_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "schema_validation",
"name": "parse_json_logs",
"config": {
"type": "json",
"value_validation_action": "reject"
}
}' | jq -r ".id"
)
The value_validation_action: reject setting ensures data quality: if a producer sends a record that isn’t valid JSON, the entire batch containing that record is rejected and the producer receives an error.
Create a Modify Headers policy to classify logs
Create a Modify Headers policy nested under the Schema Validation policy. This policy adds an x-internal: true header when the log level is debug or trace:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "modify_headers",
"name": "classify_internal_logs",
"parent_policy_id": "'$SCHEMA_VALIDATION_POLICY_ID'",
"condition": "record.value.content[\"level\"] == \"debug\" || record.value.content[\"level\"] == \"trace\"",
"config": {
"actions": [
{
"op": "set",
"key": "x-internal",
"value": "true"
}
]
}
}'
Create a Skip Records policy to filter logs
Create a Skip Records policy that filters out internal logs for non-SRE users during the consume phase:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/consume-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "skip_record",
"name": "filter_internal_logs",
"condition": "record.headers[\"x-internal\"] == \"true\" && context.auth.principal.name != \"sre_user\""
}'
Configure kafkactl
Create a kafkactl configuration with contexts for both users:
cat <<EOF > logs-cluster.yaml
contexts:
sre:
brokers:
- localhost:19092
sasl:
enabled: true
username: sre_user
password: sre_password
dev:
brokers:
- localhost:19092
sasl:
enabled: true
username: dev_user
password: dev_password
EOF
Validate
Now, let’s validate that you can produce and consume log records as the two different user profiles.
Produce log records
Produce log records with different severity levels:
echo '{"level": "info", "message": "Application started"}
{"level": "debug", "message": "Loading configuration from /etc/app/config.yaml"}
{"level": "error", "message": "Failed to connect to database"}
{"level": "trace", "message": "Entering function processRequest()"}
{"level": "warn", "message": "High memory usage detected"}' | kafkactl -C logs-cluster.yaml --context sre produce app_logs
We’ve produced 5 log records:
- 2 internal logs (
debug,trace) - will be classified withx-internal: true - 3 regular logs (
info,error,warn) - no classification header
Consume as SRE user
Consume logs as the SRE user. You should see all 5 records:
kafkactl -C logs-cluster.yaml --context sre consume app_logs --from-beginning --exit --print-headers
The output includes all logs, with x-internal:true header on debug and trace entries:
#{"level": "info", "message": "Application started"}
x-internal:true#{"level": "debug", "message": "Loading configuration from /etc/app/config.yaml"}
#{"level": "error", "message": "Failed to connect to database"}
x-internal:true#{"level": "trace", "message": "Entering function processRequest()"}
#{"level": "warn", "message": "High memory usage detected"}
Consume as developer user
Consume logs as the developer user. You should only see 3 records (debug and trace logs are filtered out):
kafkactl -C logs-cluster.yaml --context dev consume app_logs --from-beginning --exit --print-headers
The output excludes debug and trace logs:
#{"level": "info", "message": "Application started"}
#{"level": "error", "message": "Failed to connect to database"}
#{"level": "warn", "message": "High memory usage detected"}
The developer user only sees the logs relevant to their work, while the verbose debug and trace logs are automatically filtered out.
Cleanup
Clean up Kong Event Gateway resources
When you’re done experimenting with this example, clean up the resources:
-
If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.
-
Stop and remove the containers:
docker-compose downCopied!
This will stop all services and remove the containers, but preserve your configuration files for future use.