Schema Validation Consume

Related Documentation
Incompatible with
on-prem
Related Resources

This policy is used to validate messages using a provided schema during the consume phase.

Common use cases for the Schema Validation policy:

Use case

Description

Validate messages against a Confluent Schema Registry Ensure that all messages consumed from a topic are validated against a Confluent Schema Registry, and skip messages that don’t conform.
Validate messages for subset of topics against JSON Ensure that all messages consumed from topics with a specific prefix are valid JSONs, and skip messages that don’t conform.
Validate messages for a topic Ensure that all messages consumed from a topic are validated against a schema, and reject messages that don’t conform.

How it works

This policy runs in the consume phase.

Here’s how schema validation gets applied:

  1. A Kafka client sends a request to consume a messages to Event Gateway.
  2. Event Gateway passes the message to a Kafka broker.
  3. The Kafka broker returns a response with a message to Event Gateway.
  4. If not present already, Event Gateway pulls the schema from a schema registry. Event Gateway then validates the payload against the schema.
    • If the message passes validation, it gets passed back to the client.
    • If the message fails validation, the message is either dropped or marked as incorrect and passed to the client.
 
sequenceDiagram
  autonumber
  participant client as Client
  participant egw as Event Gateway
  participant schema as Schema registry
  participant broker as Kafka broker

  client->>egw: Fetch new messages
  egw->>broker: Forward
  broker->>egw: Deliver message to consume

  opt Schema not cached
    egw->>schema: Fetch schema
    schema-->>egw: Return schema
  end

  egw->>egw: Validate payload against schema

  alt Validation passed
    egw->>client: Forward
  else Validation failed
    alt Failure mode: skip
      egw->>egw: Drop message
    else Failure mode: mark
      egw->>client: Forward (marked invalid)
    end
  end

  

Validation types

The Schema Validation Consume policy supports the following validation options:

Validation option

Description

confluent_schema_registry Validates messages against the Confluent schema registry.

To use a Confluent schema registry for validation, first create a schema registry resource, then reference it in this policy.

json Simple JSON parsing without a schema.

Failure modes

The Schema Validation Consume policy supports the following failure modes in case the validation fails:

Failure mode

Description

skip Skips the message from being delivered to the client.
mark Event Gateway adds additional headers to the message before delivering it to the client. kong/sverr-key if the key validation failed, kong/sverr-value if value validation failed. The value of the header is the ID of a client that consumes invalid message.

Nested policies

This policy can serve as a parent policy. You can nest Modify Headers and Skip Records policies within it.

See the reference for nested policies for more detail.

Observability

Event Gateway emits a kong_keg_kafka_schema_validation_attempt_count metric with the following labels:

  • part - part of the record that was validated. Available values: key or value
  • result - result of the validation. Available values: success or fail
  • topic - name of the topic

Message rejection details are emitted at the DEBUG log level. To enable DEBUG level logs set the KEG__OBSERVABILITY__LOG_FLAGS environment variable to info,keg=debug.

Something wrong?

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!