Schema Validation Produce

Related Documentation
Incompatible with
on-prem
Related Resources

This policy is used to validate messages using a provided schema during the produce phase.

Common use cases for the Schema Validation policy:

Use case

Description

Validate messages against a Confluent Schema Registry Ensure that all messages produced to any topic are validated against a Confluent Schema Registry, and mark messages that don’t conform.
Validate messages for subset of topics against JSON Ensure that all messages produced to topics with a specific prefix are valid JSONs, and reject messages that don’t conform.
Validate messages for a topic Ensure that all messages produced to a topic are validated against a schema, and reject messages that don’t conform.

How it works

This policy runs in the produce phase.

Here’s how schema validation gets applied:

  1. A Kafka client produces a message and sends it to Event Gateway.
  2. If not present already, Event Gateway pulls the schema from a schema registry. Event Gateway then validates the payload against the schema.
    • If the message passes validation, it gets passed along the Kafka broker, which processes it and sends a response.
    • If the message fails validation, the request is either rejected or marked as incorrect and passed to the broker.
  3. If the message is passed along, the broker processes the message as usual, and returns a response.
 
sequenceDiagram
  autonumber
  participant client as Client
  participant egw as Event Gateway
  participant schema as Schema registry
  participant broker as Kafka broker

  client->>egw: Produce message

  opt Schema not cached
    egw->>schema: Fetch schema
    schema-->>egw: Return schema
  end

  egw->>egw: Validate payload against schema

  alt Validation passed
    egw->>broker: Forward
  else Validation failed
    alt Failure mode: reject
      egw -x client: Reject the message
    else Failure mode: mark
      egw->>broker: Forward (marked invalid)
    end
  end

  

Validation types

The Schema Validation Produce policy supports the following validation options:

Validation option

Description

confluent_schema_registry Validates messages against the Confluent schema registry.

To use a Confluent schema registry for validation, first create a schema registry resource, then reference it in this policy.

json Simple JSON parsing without a schema.

Failure modes

The Schema Validation Produce policy supports the following failure modes in case the validation fails:

Failure mode

Description

reject Rejects the batch that contains invalid message.
mark Passes the message to the broker. Event Gateway adds additional headers to the message. kong/sverr-key if the key validation failed, kong/sverr-value if value validation failed. The value of the header is the ID of a client that produced invalid message.

Nested policies

This policy can serve as a parent policy. You can nest Modify Headers policies within it.

See the reference for nested policies for more detail.

Observability

Event Gateway emits a kong_keg_kafka_schema_validation_attempt_count metric with the following labels:

  • part - part of the record that was validated. Available values: key or value
  • result - result of the validation. Available values: success or fail
  • topic - name of the topic

Message rejection details are emitted at the DEBUG log level. To enable DEBUG level logs set the KEG__OBSERVABILITY__LOG_FLAGS environment variable to info,keg=debug.

Something wrong?

Help us make these docs great!

Kong Developer docs are open source. If you find these useful and want to make them better, contribute today!