BACKEND_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "backend_cluster",
"bootstrap_servers": [
"kafka1:9092",
"kafka2:9092",
"kafka3:9092"
],
"authentication": {
"type": "anonymous"
},
"tls": {
"enabled": false
},
"insecure_allow_anonymous_virtual_cluster_auth": true
}' | jq -r ".id"
)
Validate Avro messages with Confluent Schema Registry
- Register an Avro schema in the Confluent Schema Registry.
- Create a Schema Registry entity in Event Gateway pointing to the registry.
- Create a Schema Validation produce policy with the
confluent_schema_registrytype andrejectaction.
Prerequisites
Install kafkactl
Install kafkactl. You’ll need it to interact with Kafka clusters.
Start a local Kafka cluster
Start a Docker Compose cluster with multiple Kafka services.
First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:
cat <<EOF > docker-compose.yaml
name: kafka_cluster
networks:
kafka:
name: kafka_event_gateway
services:
kafka1:
image: apache/kafka:4.2.0
networks:
- kafka
container_name: kafka1
ports:
- "9094:9094"
environment:
KAFKA_NODE_ID: 0
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka2:
image: apache/kafka:4.2.0
networks:
- kafka
container_name: kafka2
ports:
- "9095:9095"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
kafka3:
image: apache/kafka:4.2.0
networks:
- kafka
container_name: kafka3
ports:
- "9096:9096"
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
schema-registry:
image: confluentinc/cp-schema-registry:7.8.7
networks:
- kafka
container_name: schema-registry
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
EOF
Now, let’s start the local setup:
docker compose up -d
Kong Konnect
If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.
- The following Konnect items are required to complete this tutorial:
- Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
-
Set the personal access token as an environment variable:
export KONNECT_TOKEN='YOUR KONNECT TOKEN'Copied!
Kong Event Gateway running
Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:
curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway
This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:
export EVENT_GATEWAY_ID=your-gateway-id
Copy and paste the command with your Event Gateway ID into your terminal to configure your session.
This quickstart script is meant for demo purposes only, therefore it runs locally with most default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.
Overview
In this guide, you’ll learn how to enforce Avro schema validation on messages produced through Event Gateway using the Confluent Schema Registry.
We’ll use an application logging scenario where producers send log events to an app_logs topic. Each log event must conform to an Avro schema with level and message fields. The Event Gateway Schema Validation policy validates every produced message against the schema registered in the Confluent Schema Registry, and rejects messages that don’t conform.
Here’s how the data flows through the system:
flowchart LR
P[Producer] --> EG
subgraph EG [Event Gateway]
SV{Schema Validation
Valid Avro?}
SV -->|Yes| K[Kafka Broker]
SV -->|No| R[Reject]
end
K --> C[Consumer]
Create a backend cluster
Use the following command to create a backend cluster that connects to the Kafka servers you set up:
Create a virtual cluster
Create a virtual cluster with anonymous authentication:
VIRTUAL_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "logs_vc",
"destination": {
"id": "'$BACKEND_CLUSTER_ID'"
},
"dns_label": "logs",
"authentication": [
{
"type": "anonymous"
}
],
"acl_mode": "passthrough"
}' | jq -r ".id"
)
Create a listener with a forwarding policy
Create a listener to accept connections:
LISTENER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "logs_listener",
"addresses": [
"0.0.0.0"
],
"ports": [
"19092-19095"
]
}' | jq -r ".id"
)
Create a port mapping policy to forward traffic to the virtual cluster:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "forward_to_virtual_cluster",
"name": "forward_to_logs_vc",
"config": {
"type": "port_mapping",
"advertised_host": "localhost",
"destination": {
"id": "'$VIRTUAL_CLUSTER_ID'"
}
}
}'
Create a Schema Registry entity
Create a Schema Registry entity in Event Gateway that points to the Confluent Schema Registry running locally. Since the Event Gateway data plane runs in the same Docker network as the Schema Registry, use the container hostname schema-registry:
SCHEMA_REGISTRY_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/schema-registries" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"name": "local-schema-registry",
"type": "confluent",
"config": {
"schema_type": "avro",
"endpoint": "http://schema-registry:8081",
"timeout_seconds": 10
}
}' | jq -r ".id"
)
Create a Schema Validation produce policy
Create a Schema Validation policy that validates all produced messages against the Confluent Schema Registry. Messages that don’t conform to the registered Avro schema are rejected:
curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
--no-progress-meter --fail-with-body \
-H "Authorization: Bearer $KONNECT_TOKEN" \
--json '{
"type": "schema_validation",
"name": "validate_avro",
"config": {
"type": "confluent_schema_registry",
"schema_registry": {
"name": "local-schema-registry"
},
"value_validation_action": "reject"
}
}'
The value_validation_action: reject setting ensures that the entire batch containing an invalid message is rejected, and the producer receives an error. Alternatively, you can use mark, which passes the message to the broker but adds a kong/sverr-value header to flag it as invalid.
Configure kafkactl
Create a kafkactl configuration with a context that connects through the virtual cluster, with Schema Registry configured for Avro serialization:
cat <<EOF > avro-cluster.yaml
contexts:
vc:
brokers:
- localhost:19092
schemaRegistry:
url: http://localhost:8081
EOF
Create a Kafka topic
Create an app_logs topic through the virtual cluster:
kafkactl -C avro-cluster.yaml --context vc create topic app_logs
Register an Avro schema
Register an Avro schema for the app_logs topic in the Confluent Schema Registry. The schema defines two fields: level (the log severity) and message (the log content):
curl -sS --fail -X POST http://localhost:8081/subjects/app_logs-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "{\"type\": \"record\", \"name\": \"AppLog\", \"namespace\": \"com.example\", \"fields\": [{\"name\": \"level\", \"type\": \"string\"}, {\"name\": \"message\", \"type\": \"string\"}]}"}'
The subject name app_logs-value follows Confluent’s default TopicNameStrategy, which uses the pattern <topic>-value.
Validate
Use the following steps to make sure everything was set up correctly.
Produce a valid Avro message
Produce a log event that conforms to the registered Avro schema:
kafkactl -C avro-cluster.yaml --context vc produce app_logs \
--value='{"level": "info", "message": "Application started"}'
The message is serialized as Avro by kafkactl using the schema from the registry, validated by Event Gateway, and accepted:
message produced (partition=0 offset=0)
Produce a message that doesn’t match the schema
Try to produce a valid JSON message that doesn’t conform to the registered Avro schema. This message has a severity field instead of level:
kafkactl -C avro-cluster.yaml --context vc produce app_logs \
--value='{"severity": "info", "message": "Application started"}'
The message is rejected by Event Gateway because it doesn’t match the registered Avro schema:
Failed to produce message: failed to convert value to avro data: cannot decode textual record "com.example.AppLog": cannot decode textual map: cannot determine codec: "severity"
Consume the validated messages
Consume the messages from the app_logs topic to verify that only the valid Avro message was accepted. The --print-schema flag displays the Avro schema used for deserialization:
kafkactl -C avro-cluster.yaml --context vc consume app_logs --from-beginning --exit --print-schema
The output shows the valid message that passed schema validation, along with its Avro schema:
##{"type":"record","name":"AppLog","namespace":"com.example","fields":[{"name":"level","type":"string"},{"name":"message","type":"string"}]}#1#{"level":"info","message":"Application started"}
The Schema Validation policy ensures that only properly serialized Avro messages that match the registered schema reach your Kafka brokers.
Cleanup
Clean up Kong Event Gateway resources
When you’re done experimenting with this example, clean up the resources:
-
If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.
-
Stop and remove the containers:
docker-compose downCopied!
This will stop all services and remove the containers, but preserve your configuration files for future use.