Productize Kafka topics with Kong Event Gateway

Incompatible with
on-prem
Related Documentation
Related Resources
TL;DR

If your Kafka topics follow a naming convention with prefixes, you can easily organize them into categories with Kong Event Gateway:

  1. Create a virtual cluster for each product with a namespace based on the topic prefix.
  2. Create a listener with a forwarding policy for each virtual cluster.
  3. Create ACL policies to define access permissions to the virtual clusters.

Prerequisites

Install kafkactl. You’ll need it to interact with Kafka clusters.

Let’s define a context we can use to create Kafka topics.

cat <<EOF > kafkactl.yaml
contexts:
  direct:
    brokers:
      - localhost:9095
      - localhost:9096
      - localhost:9094
EOF

Start a Docker Compose cluster with multiple Kafka services.

First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:

cat <<EOF > docker-compose.yaml
name: kafka_cluster

networks:
  kafka:
    name: kafka_event_gateway

services:
  kafka1:
    image: apache/kafka:3.7.0
    networks:
      - kafka
    container_name: kafka1
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka2:
    image: apache/kafka:3.7.0
    networks:
      - kafka
    container_name: kafka2
    ports:
      - "9095:9095"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka3:
    image: apache/kafka:3.7.0
    networks:
      - kafka
    container_name: kafka3
    ports:
      - "9096:9096"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
EOF

Now, let’s start the local setup:

docker compose up -d

If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.

  1. The following Konnect items are required to complete this tutorial:
    • Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
  2. Set the personal access token as an environment variable:

    export KONNECT_TOKEN='YOUR KONNECT TOKEN'
    

Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:

curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway

This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:

export EVENT_GATEWAY_ID=your-gateway-id

Copy and paste this into your terminal to configure your session.

This quickstart script is meant for demo purposes only, therefore it runs locally with default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.

Create kafka topics

In this guide, we’ll use namespaces and ACL policies to create products from Kafka topics. We have seven Kafka topics that we’ll organize into two categories:

  • analytics_pageviews
  • analytics_clicks
  • analytics_orders
  • payments_transactions
  • payments_refunds
  • payments_orders
  • user_actions

We’ll have an analytics category and a payments category, and both of these will also include the user_actions topic.

First, we need to create these sample topics in the Kafka cluster we created in the prerequisites:

kafkactl -C kafkactl.yaml --context direct create topic analytics_pageviews
kafkactl -C kafkactl.yaml --context direct create topic analytics_clicks
kafkactl -C kafkactl.yaml --context direct create topic analytics_orders
kafkactl -C kafkactl.yaml --context direct create topic payments_transactions
kafkactl -C kafkactl.yaml --context direct create topic payments_refunds
kafkactl -C kafkactl.yaml --context direct create topic payments_orders
kafkactl -C kafkactl.yaml --context direct create topic user_actions

Create a backend cluster

Use the following command to create a backend cluster that connects to the Kafka servers you set up:

BACKEND_CLUSTER_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "backend_cluster",
       "bootstrap_servers": [
         "kafka1:9092",
         "kafka2:9092",
         "kafka3:9092"
       ],
       "authentication": {
         "type": "anonymous"
       },
       "tls": {
         "enabled": false
       }
     }' | jq -r ".id")

Create an analytics virtual cluster

Use the following command to create the first virtual cluster for the analytics category:

ANALYTICS_VC_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "analytics_vc",
       "destination": {
         "id": "'$BACKEND_CLUSTER_ID'"
       },
       "dns_label": "analytics",
       "authentication": [
         {
           "type": "sasl_plain",
           "mediation": "terminate",
           "principals": [
             {
               "username": "analytics_user",
               "password": "analytics_password"
             }
           ]
         }
       ],
       "acl_mode": "enforce_on_gateway",
       "namespace": {
         "prefix": "analytics_",
         "mode": "hide_prefix",
         "additional": {
           "topics": [
             {
               "type": "exact_list",
               "conflict": "warn",
               "exact_list": [
                 {
                   "backend": "user_actions"
                 }
               ]
             }
           ]
         }
       }
     }' | jq -r ".id")

This virtual cluster provides access to topics with the analytics_ prefix, and the user_actions topic.

Create a payments virtual cluster

Now create the payments virtual cluster:

PAYMENTS_VC_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "payments_vc",
       "destination": {
         "id": "'$BACKEND_CLUSTER_ID'"
       },
       "dns_label": "payments",
       "authentication": [
         {
           "type": "sasl_plain",
           "mediation": "terminate",
           "principals": [
             {
               "username": "payments_user",
               "password": "payments_password"
             }
           ]
         }
       ],
       "acl_mode": "enforce_on_gateway",
       "namespace": {
         "prefix": "payments_",
         "mode": "hide_prefix",
         "additional": {
           "topics": [
             {
               "type": "exact_list",
               "conflict": "warn",
               "exact_list": [
                 {
                   "backend": "user_actions"
                 }
               ]
             }
           ]
         }
       }
     }' | jq -r ".id")

This virtual cluster will be used to access topics with the payments_ prefix, and the user_actions topic.

Create an analytics listener with a policy

For testing purposes, we’ll use port forwarding to route traffic to each virtual cluster.
In production environments, you should use SNI routing instead.

Use the following command to create the listener for the analytics category:

ANALYTICS_LISTENER_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "analytics_listener",
       "addresses": [
         "0.0.0.0"
       ],
       "ports": [
         "19092-19095"
       ]
     }' | jq -r ".id")

Create the port mapping policy:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$ANALYTICS_LISTENER_ID/policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "forward_to_virtual_cluster",
       "name": "forward_to_analytics_virtual_cluster",
       "config": {
         "type": "port_mapping",
         "advertised_host": "localhost",
         "destination": {
           "id": "'$ANALYTICS_VC_ID'"
         }
       }
     }'

Create a payments listener with a policy

Use the following command to create the listener for the payments category:

PAYMENTS_LISTENER_ID=$( curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "payments_listener",
       "addresses": [
         "0.0.0.0"
       ],
       "ports": [
         "19096-19099"
       ]
     }' | jq -r ".id")

Create the port mapping policy:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$PAYMENTS_LISTENER_ID/policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "forward_to_virtual_cluster",
       "name": "forward_to_payments_virtual_cluster",
       "config": {
         "type": "port_mapping",
         "advertised_host": "localhost",
         "destination": {
           "id": "'$PAYMENTS_VC_ID'"
         }
       }
     }'

Create an ACLs policy for the analytics virtual cluster

Finally, we need to set ACL policies for both virtual clusters to allow access to the topics.

Use the following command to add an ACL policy that allows producing and consuming on the analytics_* and user_actions topics on the analytics virtual cluster:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$ANALYTICS_VC_ID/cluster-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "acls",
       "name": "analytics_acl_topic_policy",
       "config": {
         "rules": [
           {
             "resource_type": "topic",
             "action": "allow",
             "operations": [
               {
                 "name": "describe"
               },
               {
                 "name": "describe_configs"
               },
               {
                 "name": "read"
               },
               {
                 "name": "write"
               }
             ],
             "resource_names": [
               {
                 "match": "*"
               }
             ]
           }
         ]
       }
     }'

Create an ACLs policy for the payments virtual cluster

Use the following command to add an ACL policy that allows producing and consuming to the payments_* topics, but only consuming from the user_actions topics on the payments virtual cluster:

 curl  -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$PAYMENTS_VC_ID/cluster-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "acls",
       "name": "payments_acl_topic_policy",
       "config": {
         "rules": [
           {
             "resource_type": "topic",
             "action": "allow",
             "operations": [
               {
                 "name": "describe"
               },
               {
                 "name": "describe_configs"
               },
               {
                 "name": "read"
               },
               {
                 "name": "write"
               }
             ],
             "resource_names": [
               {
                 "match": "*"
               }
             ]
           },
           {
             "resource_type": "topic",
             "action": "deny",
             "operations": [
               {
                 "name": "write"
               }
             ],
             "resource_names": [
               {
                 "match": "user_actions"
               }
             ]
           }
         ]
       }
     }'

Add Kafka configuration

Use the following Kafka configuration to access your Kafka resources from the virtual clusters:

cat <<EOF > namespaced-clusters.yaml
contexts:
  analytics:
    brokers:
      - localhost:19092
    sasl:
      enabled: true
      username: analytics_user
      password: analytics_password
  payments:
    brokers:
      - localhost:19096
    sasl:
      enabled: true
      username: payments_user
      password: payments_password
EOF

Validate

Get a list of topics from the analytics virtual cluster:

kafkactl -C namespaced-clusters.yaml --context  analytics list topics

You should see the following result:

TOPIC            PARTITIONS     REPLICATION FACTOR
clicks           1              1
orders           1              1
pageviews        1              1
user_actions     1              1

You can access all the topics prefixed with analytics_ and the user_action topic. The analytics_ prefix is hidden since we set the namespace mode to hide_prefix.

Get a list of topics from the payments virtual cluster:

kafkactl -C namespaced-clusters.yaml --context  payments list topics

You should see the following result:

TOPIC            PARTITIONS     REPLICATION FACTOR
orders           1              1
refunds          1              1
transactions     1              1
user_actions     1              1

Now let’s try to write to user_actions:

  1. From the analytics virtual cluster:

    kafkactl -C namespaced-clusters.yaml --context  analytics produce user_actions --value='kafka record'
    

    You should get the following result:

    message produced (partition=0 offset=0)
    
  2. From the payments virtual cluster:

    kafkactl -C namespaced-clusters.yaml --context  payments produce user_actions --value='kafka record'
    

    Since we denied write access to the user_actions topic from the payments virtual cluster, you should get the following result:

    Failed to produce message: kafka server: The client is not authorized to access this topic
    
Something wrong?

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!