Consumer group ID per Kong Gateway Consumerv3.15+
Set the Kafka consumer group ID from the authenticated Kong Gateway Consumer’s username. Each Kong Gateway Consumer gets its own Kafka consumer group and independent offset tracking. Because group names match Kong Gateway Consumer usernames, operators can pre-allowlist them in Confluent Cloud ACLs.
If no Consumer is authenticated, the plugin falls back to random mode and logs a warning.
Prerequisites
-
You have a Kafka cluster in Confluent Cloud
-
You have a Kafka topic in the cluster
-
You have a Kong Gateway Consumer with a username configured
-
The Kong Gateway Consumer’s username is allowlisted in your Confluent Cloud ACLs
-
You have an authentication plugin configured (for example,
key-auth)
Environment variables
-
BOOTSTRAP_SERVER_HOST: The bootstrap server host -
API_KEY: The API key to use for authentication -
API_SECRET: The API secret to use for authentication -
KAFKA_TOPIC: The name of the Kafka topic to consume from
Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: http-get
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
consumer_group:
mode: kong_consumerMake the following request:
curl -i -X POST http://localhost:8001/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongClusterPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
labels:
global: 'true'
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: http-get
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
consumer_group:
mode: kong_consumer
plugin: confluent-consume
" | kubectl apply -f -Prerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "http-get"
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
consumer_group = {
mode = "kong_consumer"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
service: serviceName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: http-get
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
consumer_group:
mode: kong_consumerMake sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/services/{serviceName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
serviceName|Id: Theidornameof the service the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/services/{serviceId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
serviceId: Theidof the service the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: http-get
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
consumer_group:
mode: kong_consumer
plugin: confluent-consume
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the service resource:
kubectl annotate -n kong service SERVICE_NAME konghq.com/plugins=confluent-consumePrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "http-get"
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
consumer_group = {
mode = "kong_consumer"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
service = {
id = konnect_gateway_service.my_service.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
route: routeName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: http-get
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
consumer_group:
mode: kong_consumerMake sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/routes/{routeName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
routeName|Id: Theidornameof the route the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/routes/{routeId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
routeId: Theidof the route the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: http-get
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
consumer_group:
mode: kong_consumer
plugin: confluent-consume
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the httproute or ingress resource:
kubectl annotate -n kong httproute konghq.com/plugins=confluent-consumekubectl annotate -n kong ingress konghq.com/plugins=confluent-consumePrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "http-get"
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
consumer_group = {
mode = "kong_consumer"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
route = {
id = konnect_gateway_route.my_route.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}Add this section to your kong.yaml configuration file:
_format_version: "3.0"
plugins:
- name: confluent-consume
consumer: consumerName|Id
config:
bootstrap_servers:
- host: ${{ env "DECK_BOOTSTRAP_SERVER_HOST" }}
port: 9092
topics:
- name: ${{ env "DECK_KAFKA_TOPIC" }}
mode: http-get
cluster_api_key: ${{ env "DECK_API_KEY" }}
cluster_api_secret: ${{ env "DECK_API_SECRET" }}
consumer_group:
mode: kong_consumerMake sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -i -X POST http://localhost:8001/consumers/{consumerName|Id}/plugins/ \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
consumerName|Id: Theidornameof the consumer the plugin configuration will target.
Make the following request:
curl -X POST https://{region}.api.konghq.com/v2/control-planes/{controlPlaneId}/core-entities/consumers/{consumerId}/plugins/ \
--header "accept: application/json" \
--header "Content-Type: application/json" \
--header "Authorization: Bearer $KONNECT_TOKEN" \
--data '
{
"name": "confluent-consume",
"config": {
"bootstrap_servers": [
{
"host": "'$BOOTSTRAP_SERVER_HOST'",
"port": 9092
}
],
"topics": [
{
"name": "'$KAFKA_TOPIC'"
}
],
"mode": "http-get",
"cluster_api_key": "'$API_KEY'",
"cluster_api_secret": "'$API_SECRET'",
"consumer_group": {
"mode": "kong_consumer"
}
},
"tags": []
}
'Make sure to replace the following placeholders with your own values:
-
region: Geographic region where your Kong Konnect is hosted and operates. -
KONNECT_TOKEN: Your Personal Access Token (PAT) associated with your Konnect account. -
controlPlaneId: Theidof the control plane. -
consumerId: Theidof the consumer the plugin configuration will target.
See the Konnect API reference to learn about region-specific URLs and personal access tokens.
echo "
apiVersion: configuration.konghq.com/v1
kind: KongPlugin
metadata:
name: confluent-consume
namespace: kong
annotations:
kubernetes.io/ingress.class: kong
konghq.com/tags: ''
config:
bootstrap_servers:
- host: '$BOOTSTRAP_SERVER_HOST'
port: 9092
topics:
- name: '$KAFKA_TOPIC'
mode: http-get
cluster_api_key: '$API_KEY'
cluster_api_secret: '$API_SECRET'
consumer_group:
mode: kong_consumer
plugin: confluent-consume
" | kubectl apply -f -Next, apply the KongPlugin resource by annotating the KongConsumer resource:
kubectl annotate -n kong kongconsumer CONSUMER_NAME konghq.com/plugins=confluent-consumePrerequisite: Configure your Personal Access Token
terraform {
required_providers {
konnect = {
source = "kong/konnect"
}
}
}
provider "konnect" {
personal_access_token = "$KONNECT_TOKEN"
server_url = "https://us.api.konghq.com/"
}Add the following to your Terraform configuration to create a Konnect Gateway Plugin:
resource "konnect_gateway_plugin_confluent_consume" "my_confluent_consume" {
enabled = true
config = {
bootstrap_servers = [
{
host = var.bootstrap_server_host
port = 9092
} ]
topics = [
{
name = var.kafka_topic
} ]
mode = "http-get"
cluster_api_key = var.api_key
cluster_api_secret = var.api_secret
consumer_group = {
mode = "kong_consumer"
}
}
tags = []
control_plane_id = konnect_gateway_control_plane.my_konnect_cp.id
consumer = {
id = konnect_gateway_consumer.my_consumer.id
}
}This example requires the following variables to be added to your manifest. You can specify values at runtime by setting TF_VAR_name=value.
variable "kafka_topic" {
type = string
}