Confluent Schema Registryv3.11+
Configure the Kafka Log plugin to use the Confluent Schema Registry for validation and message serialization.
Environment variables
-
KAFKA_TOPIC
: The name of your Kafka topic. -
BOOTSTRAP_SERVER_HOST
: The bootstrap server host. -
REGISTRY_URL
: The URL of your Confluent Schema Registry instance. For example,http://schema-registry:8081
.
Add this section to your declarative configuration file:
_format_version: "3.0"
plugins:
- name: kafka-log
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topic: ${{ env "DECK_KAFKA_TOPIC" }}
schema_registry:
confluent:
url: ${{ env "DECK_REGISTRY_URL" }}
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
Make the following request:
curl -i -X POST http://localhost:8001/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
region
: Geographic region where your Kong Konnect is hosted and operates. -
controlPlaneId
: Theid
of the control plane. -
KONNECT_TOKEN
: Your Personal Access Token (PAT) associated with your Konnect account.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
name: kafka-log
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
labels:
global: 'true'
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topic: '$KAFKA_TOPIC'
schema_registry:
confluent:
url: '$REGISTRY_URL'
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
plugin: kafka-log
" | kubectl apply -f -
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_log" "my_kafka_log" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topic = var.kafka_topic
schema_registry = {
confluent = {
url = var.registry_url
authentication = {
mode = "basic"
basic = {
username = "user"
password = "password"
}
}
value_schema = {
subject_name = "kong-logs-value"
schema_version = "latest"
}
}
}
}
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value
.
variable "registry_url" {
type = string
}
Add this section to your declarative configuration file:
_format_version: "3.0"
plugins:
- name: kafka-log
service: serviceName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topic: ${{ env "DECK_KAFKA_TOPIC" }}
schema_registry:
confluent:
url: ${{ env "DECK_REGISTRY_URL" }}
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
Make sure to replace the following placeholders with your own values:
-
serviceName|Id
: Theid
orname
of the service the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/services/{serviceName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
serviceName|Id
: Theid
orname
of the service the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/services/{serviceId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
region
: Geographic region where your Kong Konnect is hosted and operates. -
controlPlaneId
: Theid
of the control plane. -
KONNECT_TOKEN
: Your Personal Access Token (PAT) associated with your Konnect account. -
serviceId
: Theid
of the service the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: kafka-log
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topic: '$KAFKA_TOPIC'
schema_registry:
confluent:
url: '$REGISTRY_URL'
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
plugin: kafka-log
" | kubectl apply -f -
Next, apply the KongPlugin
resource by annotating the service
resource:
kubectl annotate -n kong service SERVICE_NAME konghq.com/plugins=kafka-log
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_log" "my_kafka_log" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topic = var.kafka_topic
schema_registry = {
confluent = {
url = var.registry_url
authentication = {
mode = "basic"
basic = {
username = "user"
password = "password"
}
}
value_schema = {
subject_name = "kong-logs-value"
schema_version = "latest"
}
}
}
}
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
service = {
id = konnect_gateway_service.my_service.id
}
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value
.
variable "registry_url" {
type = string
}
Add this section to your declarative configuration file:
_format_version: "3.0"
plugins:
- name: kafka-log
route: routeName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topic: ${{ env "DECK_KAFKA_TOPIC" }}
schema_registry:
confluent:
url: ${{ env "DECK_REGISTRY_URL" }}
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
Make sure to replace the following placeholders with your own values:
-
routeName|Id
: Theid
orname
of the route the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/routes/{routeName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
routeName|Id
: Theid
orname
of the route the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/routes/{routeId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
region
: Geographic region where your Kong Konnect is hosted and operates. -
controlPlaneId
: Theid
of the control plane. -
KONNECT_TOKEN
: Your Personal Access Token (PAT) associated with your Konnect account. -
routeId
: Theid
of the route the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: kafka-log
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topic: '$KAFKA_TOPIC'
schema_registry:
confluent:
url: '$REGISTRY_URL'
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
plugin: kafka-log
" | kubectl apply -f -
Next, apply the KongPlugin
resource by annotating the httproute
or ingress
resource:
kubectl annotate -n kong httproute konghq.com/plugins=kafka-log
kubectl annotate -n kong ingress konghq.com/plugins=kafka-log
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_log" "my_kafka_log" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topic = var.kafka_topic
schema_registry = {
confluent = {
url = var.registry_url
authentication = {
mode = "basic"
basic = {
username = "user"
password = "password"
}
}
value_schema = {
subject_name = "kong-logs-value"
schema_version = "latest"
}
}
}
}
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
route = {
id = konnect_gateway_route.my_route.id
}
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value
.
variable "registry_url" {
type = string
}
Add this section to your declarative configuration file:
_format_version: "3.0"
plugins:
- name: kafka-log
consumer: consumerName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topic: ${{ env "DECK_KAFKA_TOPIC" }}
schema_registry:
confluent:
url: ${{ env "DECK_REGISTRY_URL" }}
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
Make sure to replace the following placeholders with your own values:
-
consumerName|Id
: Theid
orname
of the consumer the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/consumers/{consumerName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
consumerName|Id
: Theid
orname
of the consumer the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/consumers/{consumerId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-log",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topic": "'$KAFKA_TOPIC'",
"schema_registry": {
"confluent": {
"url": "'$REGISTRY_URL'",
"authentication": {
"mode": "basic",
"basic": {
"username": "user",
"password": "password"
}
},
"value_schema": {
"subject_name": "kong-logs-value",
"schema_version": "latest"
}
}
}
}
}
'
Make sure to replace the following placeholders with your own values:
-
region
: Geographic region where your Kong Konnect is hosted and operates. -
controlPlaneId
: Theid
of the control plane. -
KONNECT_TOKEN
: Your Personal Access Token (PAT) associated with your Konnect account. -
consumerId
: Theid
of the consumer the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: kafka-log
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topic: '$KAFKA_TOPIC'
schema_registry:
confluent:
url: '$REGISTRY_URL'
authentication:
mode: basic
basic:
username: user
password: password
value_schema:
subject_name: kong-logs-value
schema_version: latest
plugin: kafka-log
" | kubectl apply -f -
Next, apply the KongPlugin
resource by annotating the KongConsumer
resource:
kubectl annotate -n kong CONSUMER_NAME konghq.com/plugins=kafka-log
Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}
Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_log" "my_kafka_log" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topic = var.kafka_topic
schema_registry = {
confluent = {
url = var.registry_url
authentication = {
mode = "basic"
basic = {
username = "user"
password = "password"
}
}
value_schema = {
subject_name = "kong-logs-value"
schema_version = "latest"
}
}
}
}
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
consumer = {
id = konnect_gateway_consumer.my_consumer.id
}
}
This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value
.
variable "registry_url" {
type = string
}