The Bedrock ConverseStream API delivers model output as a sequence of events rather than a single complete response. This is particularly useful for function calling, where the interaction involves multiple round trips. Text appears in the terminal as it is generated, and tool use requests arrive as streamed chunks that your application reassembles.
The following script defines a top_song tool and uses converse_stream to interact with the model. When the LLM model requests the tool, the script executes the function locally and then sends the result back through a second converse_stream call.
The stream delivers several event types: messageStart signals the beginning of a response, contentBlockStart and contentBlockDelta carry tool use or text data in fragments, contentBlockStop marks the end of a content block, and messageStop provides the stop reason.
Create the script:
cat > bedrock-stream-tool-use-demo.py << 'EOF'
#!/usr/bin/env python3
"""Demonstrate streaming function calling through Kong's AI Gateway"""
import logging
import json
import boto3
from botocore.exceptions import ClientError
GATEWAY_URL = "http://localhost:8000"
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class StationNotFoundError(Exception):
"""Raised when a radio station isn't found."""
pass
def get_top_song(call_sign):
"""Returns the most popular song for the given radio station call sign."""
if call_sign == 'WZPZ':
return "Elemental Hotel", "8 Storey Hike"
raise StationNotFoundError(f"Station {call_sign} not found.")
def stream_messages(bedrock_client, model_id, messages, tool_config):
"""Sends a message and processes the streamed response.
Reassembles text and tool use content from stream events.
Text chunks are printed to stdout as they arrive.
Returns:
stop_reason: The reason the model stopped generating.
message: The fully reassembled response message.
"""
logger.info("Streaming messages with model %s", model_id)
response = bedrock_client.converse_stream(
modelId=model_id,
messages=messages,
toolConfig=tool_config
)
stop_reason = ""
message = {}
content = []
message['content'] = content
text = ''
tool_use = {}
for chunk in response['stream']:
if 'messageStart' in chunk:
message['role'] = chunk['messageStart']['role']
elif 'contentBlockStart' in chunk:
tool = chunk['contentBlockStart']['start']['toolUse']
tool_use['toolUseId'] = tool['toolUseId']
tool_use['name'] = tool['name']
elif 'contentBlockDelta' in chunk:
delta = chunk['contentBlockDelta']['delta']
if 'toolUse' in delta:
if 'input' not in tool_use:
tool_use['input'] = ''
tool_use['input'] += delta['toolUse']['input']
elif 'text' in delta:
text += delta['text']
print(delta['text'], end='')
elif 'contentBlockStop' in chunk:
if 'input' in tool_use:
tool_use['input'] = json.loads(tool_use['input'])
content.append({'toolUse': tool_use})
tool_use = {}
else:
content.append({'text': text})
text = ''
elif 'messageStop' in chunk:
stop_reason = chunk['messageStop']['stopReason']
return stop_reason, message
def main():
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
model_id = "cohere.command-r-v1:0"
input_text = "What is the most popular song on WZPZ?"
try:
bedrock_client = boto3.client(
"bedrock-runtime",
region_name="us-west-2",
endpoint_url=GATEWAY_URL,
aws_access_key_id="dummy",
aws_secret_access_key="dummy",
)
messages = [{"role": "user", "content": [{"text": input_text}]}]
tool_config = {
"tools": [
{
"toolSpec": {
"name": "top_song",
"description": "Get the most popular song played on a radio station.",
"inputSchema": {
"json": {
"type": "object",
"properties": {
"sign": {
"type": "string",
"description": "The call sign for the radio station for which you want the most popular song. Example call signs are WZPZ and WKRP."
}
},
"required": ["sign"]
}
}
}
}
]
}
stop_reason, message = stream_messages(
bedrock_client, model_id, messages, tool_config)
messages.append(message)
if stop_reason == "tool_use":
for block in message['content']:
if 'toolUse' in block:
tool = block['toolUse']
if tool['name'] == 'top_song':
try:
song, artist = get_top_song(tool['input']['sign'])
tool_result = {
"toolUseId": tool['toolUseId'],
"content": [{"json": {"song": song, "artist": artist}}]
}
except StationNotFoundError as err:
tool_result = {
"toolUseId": tool['toolUseId'],
"content": [{"text": err.args[0]}],
"status": 'error'
}
messages.append({
"role": "user",
"content": [{"toolResult": tool_result}]
})
stop_reason, message = stream_messages(
bedrock_client, model_id, messages, tool_config)
except ClientError as err:
message = err.response['Error']['Message']
logger.error("A client error occurred: %s", message)
print(f"A client error occurred: {message}")
else:
print(f"\nFinished streaming messages with model {model_id}.")
if __name__ == "__main__":
main()
EOF
The script points a Boto3 client at the AI Gateway route (http://localhost:8000) with dummy credentials. AI Gateway replaces these credentials with the real AWS keys from the plugin configuration before forwarding to Bedrock.
The interaction follows two streaming rounds:
- The first
converse_stream call sends the user question and tool definition. The model responds with a stream that contains a tool use request, delivering the function name (top_song) and input arguments ({"sign": "WZPZ"}) across multiple contentBlockDelta events. The script reassembles these fragments into a complete tool call.
- The script executes
get_top_song("WZPZ") locally and appends the result to the message history. A second converse_stream call sends the full conversation, including the tool result. The model streams its final answer, with each text chunk printed to the terminal as it arrives.