Forward HTTP headers as Kafka record headers (allowlist mode)v3.15+
Use config.headers to copy HTTP request headers onto the produced Kafka record as native record headers.
This makes routing, tracing, and tenancy context available to consumers without parsing the message payload.
In allowlist mode (forward_all_by_default: false), only headers listed in include_headers are forwarded.
Use name_mappings to rename an HTTP header to a different record header key, for example renaming x-request-id to trace-id.
Prerequisites
-
Kafka installed and running
Environment variables
-
KAFKA_TOPIC: The name of your Kafka topic.
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: kafka-upstream
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake the following request:
curl -i -X POST http://localhost:8001/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
name: kafka-upstream
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
labels:
global: 'true'
config:
topic: '$KAFKA_TOPIC'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: kafka-upstream
" | kubectl apply -f -Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_upstream" "my_kafka_upstream" {
enabled = true
config = {
topic = var.kafka_topic
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: kafka-upstream
service: serviceName|Id
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/services/{serviceName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/services/{serviceId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
serviceId: Theidof the service the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: kafka-upstream
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
topic: '$KAFKA_TOPIC'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: kafka-upstream
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the service resource:
kubectl annotate -n kong service SERVICE_NAME konghq.com/plugins=kafka-upstreamPrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_upstream" "my_kafka_upstream" {
enabled = true
config = {
topic = var.kafka_topic
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
service = {
id = konnect_gateway_service.my_service.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: kafka-upstream
route: routeName|Id
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/routes/{routeName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/routes/{routeId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
routeId: Theidof the route the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: kafka-upstream
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
topic: '$KAFKA_TOPIC'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: kafka-upstream
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the httproute or ingress resource:
kubectl annotate -n kong httproute konghq.com/plugins=kafka-upstreamkubectl annotate -n kong ingress konghq.com/plugins=kafka-upstreamPrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_upstream" "my_kafka_upstream" {
enabled = true
config = {
topic = var.kafka_topic
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
route = {
id = konnect_gateway_route.my_route.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: kafka-upstream
consumer: consumerName|Id
config:
topic: ${{ env "DECK_KAFKA_TOPIC" }}
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_commaMake sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/consumers/{consumerName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/consumers/{consumerId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "kafka-upstream",
"config": {
"topic": "'$KAFKA_TOPIC'",
"headers": {
"forward_http_headers_as_record_headers": true,
"forward_all_by_default": false,
"include_headers": [
"x-request-id",
"x-tenant-id",
"x-correlation-id"
],
"name_mappings": {
"x-request-id": "trace-id",
"x-tenant-id": "tenant"
},
"repeated_headers_behavior": "concatenate_by_comma"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
consumerId: Theidof the consumer the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: kafka-upstream
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
topic: '$KAFKA_TOPIC'
headers:
forward_http_headers_as_record_headers: true
forward_all_by_default: false
include_headers:
- x-request-id
- x-tenant-id
- x-correlation-id
name_mappings:
x-request-id: trace-id
x-tenant-id: tenant
repeated_headers_behavior: concatenate_by_comma
plugin: kafka-upstream
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the KongConsumer resource:
kubectl annotate -n kong kongconsumer CONSUMER_NAME konghq.com/plugins=kafka-upstreamPrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_kafka_upstream" "my_kafka_upstream" {
enabled = true
config = {
topic = var.kafka_topic
headers = {
forward_http_headers_as_record_headers = true
forward_all_by_default = false
include_headers = ["x-request-id", "x-tenant-id", "x-correlation-id"]
name_mappings = {
x-request-id = "trace-id"
x-tenant-id = "tenant"
}
repeated_headers_behavior = "concatenate_by_comma"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
consumer = {
id = konnect_gateway_consumer.my_consumer.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}