Forward HTTP headers as Kafka record headers (allowlist mode)v3.15+
Use config.headers to copy HTTP request headers onto the produced Kafka record as native record headers.
This makes routing, tracing, and tenancy context available to consumers without parsing the message payload.
In allowlist mode (forward_all_by_default: false), only headers listed in include_headers are forwarded.
Use name_mappings to rename an HTTP header to a different record header key, for example renaming x-request-id to trace-id.
Prerequisites
-
Create an API key and secret in the cluster
Environment variables
-
KAFKA_TOPIC: The name of your Kafka topic. -
CONFLUENT_CLUSTER_API_KEY: Your Confluent cluster API key. -
CONFLUENT_CLUSTER_API_SECRET: Your Confluent cluster API secret.
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
cluster_api_key: ${{ env "DECK_CONFLUENT_CLUSTER_API_KEY" }}
cluster_api_secret: ${{ env "DECK_CONFLUENT_CLUSTER_API_SECRET" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake the following request:
curl -i -X POST http://localhost:8001/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
name: confluent
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
labels:
global: 'true'
config:
topic: '$KAFKA_TOPIC'
cluster_api_key: '$CONFLUENT_CLUSTER_API_KEY'
cluster_api_secret: '$CONFLUENT_CLUSTER_API_SECRET'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: confluent
" | kubectl apply -f -Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent" "my_confluent" {
enabled = true
config = {
topic = var.kafka_topic
cluster_api_key = var.confluent_cluster_api_key
cluster_api_secret = var.confluent_cluster_api_secret
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "confluent_cluster_api_secret" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent
service: serviceName|Id
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
cluster_api_key: ${{ env "DECK_CONFLUENT_CLUSTER_API_KEY" }}
cluster_api_secret: ${{ env "DECK_CONFLUENT_CLUSTER_API_SECRET" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/services/{serviceName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/services/{serviceId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
serviceId: Theidof the service the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
topic: '$KAFKA_TOPIC'
cluster_api_key: '$CONFLUENT_CLUSTER_API_KEY'
cluster_api_secret: '$CONFLUENT_CLUSTER_API_SECRET'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: confluent
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the service resource:
kubectl annotate -n kong service SERVICE_NAME konghq.com/plugins=confluentPrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent" "my_confluent" {
enabled = true
config = {
topic = var.kafka_topic
cluster_api_key = var.confluent_cluster_api_key
cluster_api_secret = var.confluent_cluster_api_secret
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
service = {
id = konnect_gateway_service.my_service.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "confluent_cluster_api_secret" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent
route: routeName|Id
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
cluster_api_key: ${{ env "DECK_CONFLUENT_CLUSTER_API_KEY" }}
cluster_api_secret: ${{ env "DECK_CONFLUENT_CLUSTER_API_SECRET" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/routes/{routeName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/routes/{routeId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
routeId: Theidof the route the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
topic: '$KAFKA_TOPIC'
cluster_api_key: '$CONFLUENT_CLUSTER_API_KEY'
cluster_api_secret: '$CONFLUENT_CLUSTER_API_SECRET'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: confluent
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the httproute or ingress resource:
kubectl annotate -n kong httproute konghq.com/plugins=confluentkubectl annotate -n kong ingress konghq.com/plugins=confluentPrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent" "my_confluent" {
enabled = true
config = {
topic = var.kafka_topic
cluster_api_key = var.confluent_cluster_api_key
cluster_api_secret = var.confluent_cluster_api_secret
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
route = {
id = konnect_gateway_route.my_route.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "confluent_cluster_api_secret" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent
consumer: consumerName|Id
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
cluster_api_key: ${{ env "DECK_CONFLUENT_CLUSTER_API_KEY" }}
cluster_api_secret: ${{ env "DECK_CONFLUENT_CLUSTER_API_SECRET" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/consumers/{consumerName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/consumers/{consumerId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent",
"config": {
"topic": "'$KAFKA_TOPIC'",
"cluster_api_key": "'$CONFLUENT_CLUSTER_API_KEY'",
"cluster_api_secret": "'$CONFLUENT_CLUSTER_API_SECRET'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
consumerId: Theidof the consumer the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
topic: '$KAFKA_TOPIC'
cluster_api_key: '$CONFLUENT_CLUSTER_API_KEY'
cluster_api_secret: '$CONFLUENT_CLUSTER_API_SECRET'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: confluent
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the KongConsumer resource:
kubectl annotate -n kong kongconsumer CONSUMER_NAME konghq.com/plugins=confluentPrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent" "my_confluent" {
enabled = true
config = {
topic = var.kafka_topic
cluster_api_key = var.confluent_cluster_api_key
cluster_api_secret = var.confluent_cluster_api_secret
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
consumer = {
id = konnect_gateway_consumer.my_consumer.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "confluent_cluster_api_secret" {
type = string
}