kafkactl -C kafkactl.yaml --context direct create topic analytics_pageviews
kafkactl -C kafkactl.yaml --context direct create topic analytics_clicks
kafkactl -C kafkactl.yaml --context direct create topic analytics_orders
kafkactl -C kafkactl.yaml --context direct create topic payments_transactions
kafkactl -C kafkactl.yaml --context direct create topic payments_refunds
kafkactl -C kafkactl.yaml --context direct create topic payments_orders
kafkactl -C kafkactl.yaml --context direct create topic user_actions
Productize Kafka topics with Kong Event Gateway
If your Kafka topics follow a naming convention with prefixes, you can easily organize them into categories with Kong Event Gateway:
- Create a virtual cluster for each product with a namespace based on the topic prefix.
- Create a listener with a forwarding policy for each virtual cluster.
- Create ACL policies to define access permissions to the virtual clusters.
Prerequisites
Install kafkactl
Install kafkactl. You’ll need it to interact with Kafka clusters.
Define a context for kafkactl
Let’s define a context we can use to create Kafka topics.
cat <<EOF > kafkactl.yaml
contexts:
direct:
brokers:
- localhost:9095
- localhost:9096
- localhost:9094
EOF
Start a local Kafka cluster
Start a Docker Compose cluster with multiple Kafka services.
First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:
cat <<EOF > docker-compose.yaml
name: kafka_cluster
networks:
kafka:
name: kafka_event_gateway
services:
kafka1:
image: apache/kafka:3.7.0
networks:
- kafka
container_name: kafka1
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka2:
image: apache/kafka:3.7.0
networks:
- kafka
container_name: kafka2
ports:
- "9095:9095"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka3:
image: apache/kafka:3.7.0
networks:
- kafka
container_name: kafka3
ports:
- "9096:9096"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
EOF
Now, let’s start the local setup:
docker compose up -d
Kong Konnect
If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.
- The following Konnect items are required to complete this tutorial:
- Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
-
Set the personal access token as an environment variable:
export KONNECT_TOKEN='YOUR KONNECT TOKEN'Copied!
Kong Event Gateway running
Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:
curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway
This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:
export EVENT_GATEWAY_ID=your-gateway-id
Copy and paste this into your terminal to configure your session.
This quickstart script is meant for demo purposes only, therefore it runs locally with default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.
Create kafka topics
In this guide, we’ll use namespaces and ACL policies to create products from Kafka topics. We have seven Kafka topics that we’ll organize into two categories:
analytics_pageviewsanalytics_clicksanalytics_orderspayments_transactionspayments_refundspayments_ordersuser_actions
We’ll have an analytics category and a payments category, and both of these will also include the user_actions topic.
First, we need to create these sample topics in the Kafka cluster we created in the prerequisites:
Create a backend cluster
Use the following command to create a backend cluster that connects to the Kafka servers you set up:
BACKEND_CLUSTER_ID=$( curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "backend_cluster",
"bootstrap_servers": [
"kafka1:9092",
"kafka2:9092",
"kafka3:9092"
],
"authentication": {
"type": "anonymous"
},
"tls": {
"enabled": false
}
}' | jq -r ".id")
Create an analytics virtual cluster
Use the following command to create the first virtual cluster for the analytics category:
ANALYTICS_VC_ID=$( curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "analytics_vc",
"destination": {
"id": "'$BACKEND_CLUSTER_ID'"
},
"dns_label": "analytics",
"authentication": [
{
"type": "sasl_plain",
"mediation": "terminate",
"principals": [
{
"username": "analytics_user",
"password": "analytics_password"
}
]
}
],
"acl_mode": "enforce_on_gateway",
"namespace": {
"prefix": "analytics_",
"mode": "hide_prefix",
"additional": {
"topics": [
{
"type": "exact_list",
"conflict": "warn",
"exact_list": [
{
"backend": "user_actions"
}
]
}
]
}
}
}' | jq -r ".id")
This virtual cluster provides access to topics with the analytics_ prefix, and the user_actions topic.
Create a payments virtual cluster
Now create the payments virtual cluster:
PAYMENTS_VC_ID=$( curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "payments_vc",
"destination": {
"id": "'$BACKEND_CLUSTER_ID'"
},
"dns_label": "payments",
"authentication": [
{
"type": "sasl_plain",
"mediation": "terminate",
"principals": [
{
"username": "payments_user",
"password": "payments_password"
}
]
}
],
"acl_mode": "enforce_on_gateway",
"namespace": {
"prefix": "payments_",
"mode": "hide_prefix",
"additional": {
"topics": [
{
"type": "exact_list",
"conflict": "warn",
"exact_list": [
{
"backend": "user_actions"
}
]
}
]
}
}
}' | jq -r ".id")
This virtual cluster will be used to access topics with the payments_ prefix, and the user_actions topic.
Create an analytics listener with a policy
For testing purposes, we’ll use port forwarding to route traffic to each virtual cluster.
In production environments, you should use SNI routing instead.
Use the following command to create the listener for the analytics category:
ANALYTICS_LISTENER_ID=$( curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "analytics_listener",
"addresses": [
"0.0.0.0"
],
"ports": [
"19092-19095"
]
}' | jq -r ".id")
Create the port mapping policy:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$ANALYTICS_LISTENER_ID/policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "forward_to_virtual_cluster",
"name": "forward_to_analytics_virtual_cluster",
"config": {
"type": "port_mapping",
"advertised_host": "localhost",
"destination": {
"id": "'$ANALYTICS_VC_ID'"
}
}
}'
Create a payments listener with a policy
Use the following command to create the listener for the payments category:
PAYMENTS_LISTENER_ID=$( curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "payments_listener",
"addresses": [
"0.0.0.0"
],
"ports": [
"19096-19099"
]
}' | jq -r ".id")
Create the port mapping policy:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$PAYMENTS_LISTENER_ID/policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "forward_to_virtual_cluster",
"name": "forward_to_payments_virtual_cluster",
"config": {
"type": "port_mapping",
"advertised_host": "localhost",
"destination": {
"id": "'$PAYMENTS_VC_ID'"
}
}
}'
Create an ACLs policy for the analytics virtual cluster
Finally, we need to set ACL policies for both virtual clusters to allow access to the topics.
Use the following command to add an ACL policy that allows producing and consuming on the analytics_* and user_actions topics on the analytics virtual cluster:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$ANALYTICS_VC_ID/cluster-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "acls",
"name": "analytics_acl_topic_policy",
"config": {
"rules": [
{
"resource_type": "topic",
"action": "allow",
"operations": [
{
"name": "describe"
},
{
"name": "describe_configs"
},
{
"name": "read"
},
{
"name": "write"
}
],
"resource_names": [
{
"match": "*"
}
]
}
]
}
}'
Create an ACLs policy for the payments virtual cluster
Use the following command to add an ACL policy that allows producing and consuming to the payments_* topics, but only consuming from the user_actions topics on the payments virtual cluster:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$PAYMENTS_VC_ID/cluster-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "acls",
"name": "payments_acl_topic_policy",
"config": {
"rules": [
{
"resource_type": "topic",
"action": "allow",
"operations": [
{
"name": "describe"
},
{
"name": "describe_configs"
},
{
"name": "read"
},
{
"name": "write"
}
],
"resource_names": [
{
"match": "*"
}
]
},
{
"resource_type": "topic",
"action": "deny",
"operations": [
{
"name": "write"
}
],
"resource_names": [
{
"match": "user_actions"
}
]
}
]
}
}'
Add Kafka configuration
Use the following Kafka configuration to access your Kafka resources from the virtual clusters:
cat <<EOF > namespaced-clusters.yaml
contexts:
analytics:
brokers:
- localhost:19092
sasl:
enabled: true
username: analytics_user
password: analytics_password
payments:
brokers:
- localhost:19096
sasl:
enabled: true
username: payments_user
password: payments_password
EOF
Validate
Get a list of topics from the analytics virtual cluster:
kafkactl -C namespaced-clusters.yaml --context analytics list topics
You should see the following result:
TOPIC PARTITIONS REPLICATION FACTOR
clicks 1 1
orders 1 1
pageviews 1 1
user_actions 1 1
You can access all the topics prefixed with analytics_ and the user_action topic. The analytics_ prefix is hidden since we set the namespace mode to hide_prefix.
Get a list of topics from the payments virtual cluster:
kafkactl -C namespaced-clusters.yaml --context payments list topics
You should see the following result:
TOPIC PARTITIONS REPLICATION FACTOR
orders 1 1
refunds 1 1
transactions 1 1
user_actions 1 1
Now let’s try to write to user_actions:
-
From the
analyticsvirtual cluster:kafkactl -C namespaced-clusters.yaml --context analytics produce user_actions --value='kafka record'Copied!You should get the following result:
message produced (partition=0 offset=0) -
From the
paymentsvirtual cluster:kafkactl -C namespaced-clusters.yaml --context payments produce user_actions --value='kafka record'Copied!Since we denied write access to the
user_actionstopic from thepaymentsvirtual cluster, you should get the following result:Failed to produce message: kafka server: The client is not authorized to access this topic