Validate Avro messages with Confluent Schema Registry

TL;DR
  1. Register an Avro schema in the Confluent Schema Registry.
  2. Create a Schema Registry entity in Event Gateway pointing to the registry.
  3. Create a Schema Validation produce policy with the confluent_schema_registry type and reject action.

Prerequisites

Install kafkactl. You’ll need it to interact with Kafka clusters.

Start a Docker Compose cluster with multiple Kafka services.

First, we need to create a docker-compose.yaml file. This file will define the services we want to run in our local environment:

cat <<EOF > docker-compose.yaml
name: kafka_cluster

networks:
  kafka:
    name: kafka_event_gateway

services:
  kafka1:
    image: apache/kafka:4.2.0
    networks:
      - kafka
    container_name: kafka1
    ports:
      - "9094:9094"
    environment:
      KAFKA_NODE_ID: 0
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka1:9092,CONTROLLER://kafka1:9093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:9092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka2:
    image: apache/kafka:4.2.0
    networks:
      - kafka
    container_name: kafka2
    ports:
      - "9095:9095"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka2:9092,CONTROLLER://kafka2:9093,EXTERNAL://0.0.0.0:9095
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:9092,EXTERNAL://localhost:9095
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  kafka3:
    image: apache/kafka:4.2.0
    networks:
      - kafka
    container_name: kafka3
    ports:
      - "9096:9096"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENERS: INTERNAL://kafka3:9092,CONTROLLER://kafka3:9093,EXTERNAL://0.0.0.0:9096
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:9092,EXTERNAL://localhost:9096
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_QUORUM_VOTERS: 0@kafka1:9093,1@kafka2:9093,2@kafka3:9093
      KAFKA_CLUSTER_ID: 'abcdefghijklmnopqrstuv'
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs

  schema-registry:
    image: confluentinc/cp-schema-registry:7.8.7
    networks:
      - kafka
    container_name: schema-registry
    depends_on:
      - kafka1
      - kafka2
      - kafka3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092

EOF

Now, let’s start the local setup:

docker compose up -d

If you don’t have a Konnect account, you can get started quickly with our onboarding wizard.

  1. The following Konnect items are required to complete this tutorial:
    • Personal access token (PAT): Create a new personal access token by opening the Konnect PAT page and selecting Generate Token.
  2. Set the personal access token as an environment variable:

    export KONNECT_TOKEN='YOUR KONNECT TOKEN'
    

Run the quickstart script to automatically provision a demo Kong Gateway control plane and data plane, and configure your environment:

curl -Ls https://get.konghq.com/event-gateway | bash -s -- -k $KONNECT_TOKEN -N kafka_event_gateway

This sets up an Kong Gateway control plane named event-gateway-quickstart, provisions a local data plane, and prints out the following environment variable export:

export EVENT_GATEWAY_ID=your-gateway-id

Copy and paste the command with your Event Gateway ID into your terminal to configure your session.

This quickstart script is meant for demo purposes only, therefore it runs locally with most default parameters and a small number of exposed ports. If you want to run Kong Gateway as a part of a production-ready platform, set up your control plane and data planes through the Konnect UI, or using Terraform.

Overview

In this guide, you’ll learn how to enforce Avro schema validation on messages produced through Event Gateway using the Confluent Schema Registry.

We’ll use an application logging scenario where producers send log events to an app_logs topic. Each log event must conform to an Avro schema with level and message fields. The Event Gateway Schema Validation policy validates every produced message against the schema registered in the Confluent Schema Registry, and rejects messages that don’t conform.

Here’s how the data flows through the system:

 
flowchart LR
    P[Producer] --> EG

    subgraph EG [Event Gateway]
        SV{Schema Validation
Valid Avro?} SV -->|Yes| K[Kafka Broker] SV -->|No| R[Reject] end K --> C[Consumer]

Create a backend cluster

Use the following command to create a backend cluster that connects to the Kafka servers you set up:

BACKEND_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/backend-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "backend_cluster",
       "bootstrap_servers": [
         "kafka1:9092",
         "kafka2:9092",
         "kafka3:9092"
       ],
       "authentication": {
         "type": "anonymous"
       },
       "tls": {
         "enabled": false
       },
       "insecure_allow_anonymous_virtual_cluster_auth": true
     }' | jq -r ".id"
)

Create a virtual cluster

Create a virtual cluster with anonymous authentication:

VIRTUAL_CLUSTER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "logs_vc",
       "destination": {
         "id": "'$BACKEND_CLUSTER_ID'"
       },
       "dns_label": "logs",
       "authentication": [
         {
           "type": "anonymous"
         }
       ],
       "acl_mode": "passthrough"
     }' | jq -r ".id"
)

Create a listener with a forwarding policy

Create a listener to accept connections:

LISTENER_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "logs_listener",
       "addresses": [
         "0.0.0.0"
       ],
       "ports": [
         "19092-19095"
       ]
     }' | jq -r ".id"
)

Create a port mapping policy to forward traffic to the virtual cluster:

curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/listeners/$LISTENER_ID/policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "forward_to_virtual_cluster",
       "name": "forward_to_logs_vc",
       "config": {
         "type": "port_mapping",
         "advertised_host": "localhost",
         "destination": {
           "id": "'$VIRTUAL_CLUSTER_ID'"
         }
       }
     }'

Create a Schema Registry entity

Create a Schema Registry entity in Event Gateway that points to the Confluent Schema Registry running locally. Since the Event Gateway data plane runs in the same Docker network as the Schema Registry, use the container hostname schema-registry:

SCHEMA_REGISTRY_ID=$(curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/schema-registries" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "name": "local-schema-registry",
       "type": "confluent",
       "config": {
         "schema_type": "avro",
         "endpoint": "http://schema-registry:8081",
         "timeout_seconds": 10
       }
     }' | jq -r ".id"
)

Create a Schema Validation produce policy

Create a Schema Validation policy that validates all produced messages against the Confluent Schema Registry. Messages that don’t conform to the registered Avro schema are rejected:

curl -X POST "https://us.api.konghq.com/v1/event-gateways/$EVENT_GATEWAY_ID/virtual-clusters/$VIRTUAL_CLUSTER_ID/produce-policies" \
     --no-progress-meter --fail-with-body  \
     -H "Authorization: Bearer $KONNECT_TOKEN" \
     --json '{
       "type": "schema_validation",
       "name": "validate_avro",
       "config": {
         "type": "confluent_schema_registry",
         "schema_registry": {
           "name": "local-schema-registry"
         },
         "value_validation_action": "reject"
       }
     }'

The value_validation_action: reject setting ensures that the entire batch containing an invalid message is rejected, and the producer receives an error. Alternatively, you can use mark, which passes the message to the broker but adds a kong/sverr-value header to flag it as invalid.

Configure kafkactl

Create a kafkactl configuration with a context that connects through the virtual cluster, with Schema Registry configured for Avro serialization:

cat <<EOF > avro-cluster.yaml
contexts:
  vc:
    brokers:
      - localhost:19092
    schemaRegistry:
      url: http://localhost:8081
EOF

Create a Kafka topic

Create an app_logs topic through the virtual cluster:

kafkactl -C avro-cluster.yaml --context vc create topic app_logs

Register an Avro schema

Register an Avro schema for the app_logs topic in the Confluent Schema Registry. The schema defines two fields: level (the log severity) and message (the log content):

curl -sS --fail -X POST http://localhost:8081/subjects/app_logs-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "{\"type\": \"record\", \"name\": \"AppLog\", \"namespace\": \"com.example\", \"fields\": [{\"name\": \"level\", \"type\": \"string\"}, {\"name\": \"message\", \"type\": \"string\"}]}"}'

The subject name app_logs-value follows Confluent’s default TopicNameStrategy, which uses the pattern <topic>-value.

Validate

Use the following steps to make sure everything was set up correctly.

Produce a valid Avro message

Produce a log event that conforms to the registered Avro schema:

kafkactl -C avro-cluster.yaml --context vc produce app_logs \
  --value='{"level": "info", "message": "Application started"}'

The message is serialized as Avro by kafkactl using the schema from the registry, validated by Event Gateway, and accepted:

message produced (partition=0	offset=0)

Produce a message that doesn’t match the schema

Try to produce a valid JSON message that doesn’t conform to the registered Avro schema. This message has a severity field instead of level:

kafkactl -C avro-cluster.yaml --context vc produce app_logs \
  --value='{"severity": "info", "message": "Application started"}'

The message is rejected by Event Gateway because it doesn’t match the registered Avro schema:

Failed to produce message: failed to convert value to avro data: cannot decode textual record "com.example.AppLog": cannot decode textual map: cannot determine codec: "severity"

Consume the validated messages

Consume the messages from the app_logs topic to verify that only the valid Avro message was accepted. The --print-schema flag displays the Avro schema used for deserialization:

kafkactl -C avro-cluster.yaml --context vc consume app_logs --from-beginning --exit --print-schema

The output shows the valid message that passed schema validation, along with its Avro schema:

##{"type":"record","name":"AppLog","namespace":"com.example","fields":[{"name":"level","type":"string"},{"name":"message","type":"string"}]}#1#{"level":"info","message":"Application started"}

The Schema Validation policy ensures that only properly serialized Avro messages that match the registered schema reach your Kafka brokers.

Cleanup

When you’re done experimenting with this example, clean up the resources:

  1. If you created a new Event Gateway control plane and want to conserve your free trial credits or avoid unnecessary charges, delete the new control plane used in this tutorial.

  2. Stop and remove the containers:

    docker-compose down
    

This will stop all services and remove the containers, but preserve your configuration files for future use.

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!