cat <<'EOF' > demo.py
"""Voice AI pipeline observability demo. See README for context."""
import os
import sys
import tempfile
import time
import uuid
from pathlib import Path
from openai import APIStatusError, OpenAI
from opentelemetry import context as otel_context
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
PROXY_URL = os.getenv("PROXY_URL", "http://localhost:8000")
CHAT_MODEL = os.getenv("CHAT_MODEL", "gpt-4o")
VOICE_API_KEY = os.getenv("VOICE_API_KEY", "voice-demo-key")
SESSION_ID = os.getenv("SESSION_ID", f"voice-demo-{uuid.uuid4().hex[:12]}")
# ANSI color codes. Disabled when stdout isn't a TTY or NO_COLOR is set.
_USE_COLOR = sys.stdout.isatty() and "NO_COLOR" not in os.environ
def _c(code: str, s: str) -> str:
return f"\033[{code}m{s}\033[0m" if _USE_COLOR else s
BOLD = lambda s: _c("1", s)
DIM = lambda s: _c("2", s)
GREEN = lambda s: _c("32", s)
BLUE = lambda s: _c("34", s)
CYAN = lambda s: _c("36", s)
RED = lambda s: _c("31", s)
MAGENTA= lambda s: _c("35", s)
LANGFUSE_OTLP_ENDPOINT = os.environ["DECK_LANGFUSE_OTLP_ENDPOINT"]
LANGFUSE_AUTH_HEADER = os.environ["DECK_LANGFUSE_AUTH_HEADER"]
# DECK_OPENAI_TOKEN carries the "Bearer " prefix Kong needs; strip it for the
# direct OpenAI client used by the setup phase.
RAW_OPENAI_KEY = os.environ["DECK_OPENAI_TOKEN"].removeprefix("Bearer ").strip()
# ---------------------------------------------------------------------------
# OpenTelemetry SDK setup
# ---------------------------------------------------------------------------
provider = TracerProvider(resource=Resource.create({"service.name": "voice-ai-demo"}))
provider.add_span_processor(
BatchSpanProcessor(
OTLPSpanExporter(
endpoint=LANGFUSE_OTLP_ENDPOINT,
headers={
"Authorization": LANGFUSE_AUTH_HEADER,
"x-langfuse-ingestion-version": "4",
},
)
)
)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("voice-ai-demo")
HTTPXClientInstrumentor().instrument()
# ---------------------------------------------------------------------------
# Kong-pointed clients (key-auth via apikey header in default_headers)
# ---------------------------------------------------------------------------
#
# The OpenAI SDK reserves api_key for the Authorization: Bearer header. Kong
# replaces that header server-side with the real provider credential, so we
# pass the recipe's key-auth credential through default_headers instead.
stt_client = OpenAI(
base_url=f"{PROXY_URL}/voice-ai-observability/stt",
api_key="placeholder",
default_headers={"apikey": VOICE_API_KEY},
)
llm_client = OpenAI(
base_url=f"{PROXY_URL}/voice-ai-observability/llm",
api_key="placeholder",
default_headers={"apikey": VOICE_API_KEY},
)
tts_client = OpenAI(
base_url=f"{PROXY_URL}/voice-ai-observability/tts",
api_key="placeholder",
default_headers={"apikey": VOICE_API_KEY},
)
# ---------------------------------------------------------------------------
# Out-of-band setup: synthesize question audio directly via OpenAI
# ---------------------------------------------------------------------------
def synthesize_question_audio(text):
"""Synthesize question audio directly via OpenAI, bypassing Kong.
This stands in for a microphone source. The call is suppressed from
OpenTelemetry instrumentation so it does not appear in Langfuse, since
the recipe traces what happens through Kong, and a real voice agent
would receive raw audio from a hardware input.
"""
direct_client = OpenAI(api_key=RAW_OPENAI_KEY, base_url="https://api.openai.com/v1")
# Suppress httpx instrumentation for this call so the setup span is not
# emitted to Langfuse (would otherwise surface as an unparented orphan).
token = otel_context.attach(
otel_context.set_value("suppress_instrumentation", True)
)
try:
response = direct_client.audio.speech.create(
model="tts-1", voice="alloy", input=text
)
return response.read()
finally:
otel_context.detach(token)
# ---------------------------------------------------------------------------
# Negative path: invalid key-auth credential
# ---------------------------------------------------------------------------
def check_auth_boundary():
"""Send a request with an invalid key, expecting Kong to return 401.
Confirms key-auth is enforced on the recipe's Routes before any traced
turn runs. Suppress instrumentation so this setup probe doesn't pollute
Langfuse with an unauthenticated outlier.
"""
bad_client = OpenAI(
base_url=f"{PROXY_URL}/voice-ai-observability/llm",
api_key="placeholder",
default_headers={"apikey": "wrong-key"},
)
token = otel_context.attach(
otel_context.set_value("suppress_instrumentation", True)
)
try:
bad_client.chat.completions.create(
model=CHAT_MODEL,
messages=[{"role": "user", "content": "ping"}],
)
except APIStatusError as exc:
print(f" {GREEN(BOLD('[AUTH]'))} expected reject -> {RED(BOLD(str(exc.status_code)))} {exc.message[:80]}")
return exc.status_code
finally:
otel_context.detach(token)
print(f" {RED(BOLD('[AUTH]'))} unexpected: invalid key was accepted")
return None
# ---------------------------------------------------------------------------
# Pipeline stages: three traced hops through Kong
# ---------------------------------------------------------------------------
def speech_to_text(audio_bytes):
"""Transcribe audio via the Kong STT route."""
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
tmp.write(audio_bytes)
tmp_path = Path(tmp.name)
try:
start = time.perf_counter()
with open(tmp_path, "rb") as f:
transcript = stt_client.audio.transcriptions.create(
model="whisper-1", file=f
)
elapsed = time.perf_counter() - start
finally:
tmp_path.unlink(missing_ok=True)
print(f" {CYAN(BOLD('[STT]'))} \"{transcript.text}\" {DIM(f'({elapsed:.3f}s)')}")
return transcript.text, elapsed
def llm_chat(messages):
"""Send the conversation so far to the LLM via the Kong chat route."""
start = time.perf_counter()
raw = llm_client.chat.completions.with_raw_response.create(
model=CHAT_MODEL, messages=messages
)
elapsed = time.perf_counter() - start
completion = raw.parse()
reply = completion.choices[0].message.content
usage = completion.usage
kong_model = raw.headers.get("x-kong-llm-model", CHAT_MODEL)
upstream_ms = raw.headers.get("x-kong-upstream-latency", "-")
print(f" {CYAN(BOLD('[LLM]'))} \"{reply}\"")
stats = (
f"Model: {kong_model} "
f"Tokens: {usage.prompt_tokens} in / {usage.completion_tokens} out "
f"Upstream: {upstream_ms}ms ({elapsed:.3f}s wall)"
)
print(f" {DIM(stats)}")
return reply, elapsed
def text_to_speech(text):
"""Synthesize the LLM response to audio via the Kong TTS route."""
start = time.perf_counter()
response = tts_client.audio.speech.create(
model="tts-1", voice="alloy", input=text
)
audio_bytes = response.read()
elapsed = time.perf_counter() - start
print(f" {CYAN(BOLD('[TTS]'))} Generated {len(audio_bytes):,} bytes of audio {DIM(f'({elapsed:.3f}s)')}")
return audio_bytes, elapsed
# ---------------------------------------------------------------------------
# Per-turn orchestration
# ---------------------------------------------------------------------------
def run_turn(question, audio_bytes, turn_number, history):
"""Execute one full cascading voice pipeline turn under a `voice-turn` span.
`history` is the running OpenAI-format messages list for the conversation;
this turn's user transcription and assistant reply are appended to it so
later turns see prior context.
"""
print(f"\n{BOLD(f'Turn {turn_number}:')} \"{question}\"")
print("=" * 60)
with tracer.start_as_current_span(
"voice-turn",
attributes={
"langfuse.session.id": SESSION_ID,
"langfuse.trace.name": f"voice-turn-{turn_number}",
"langfuse.trace.tags": ["voice-ai", "cascading-pipeline"],
"voice.turn.number": turn_number,
"voice.question": question,
},
) as span:
timings = {}
print("\n1. Speech -> Text (STT)")
transcription, timings["stt"] = speech_to_text(audio_bytes)
history.append({"role": "user", "content": transcription})
print("\n2. Transcription -> LLM")
response_text, timings["llm"] = llm_chat(history)
history.append({"role": "assistant", "content": response_text})
print("\n3. LLM Response -> Speech (TTS)")
_, timings["tts"] = text_to_speech(response_text)
span.set_attribute("voice.response", response_text)
trace_id = f"{span.get_span_context().trace_id:032x}"
total = sum(timings.values())
breakdown = " + ".join(f"{k}: {v:.3f}s" for k, v in timings.items())
print(f"\n{'-' * 60}")
print(f"{BOLD('Turn complete:')} {total:.3f}s total {DIM(f'({breakdown})')}")
# Trace ID is the link readers paste into Langfuse. That's the headline output.
print(f"Trace ID: {MAGENTA(BOLD(trace_id))}")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
if __name__ == "__main__":
print(BOLD("Voice AI Pipeline Observability Demo"))
print("=" * 60)
print(f"Kong Proxy: {PROXY_URL}")
print(f"Chat Model: {CYAN(BOLD(CHAT_MODEL))}")
# Session ID is what the reader uses to find this run in Langfuse.
print(f"Session ID: {BLUE(BOLD(SESSION_ID))}")
print(f"\n{DIM('Checking key-auth boundary...')}")
check_auth_boundary()
questions = [
"What are the three laws of robotics?",
"Who wrote them?",
"What year were they first published?",
]
print(f"\n{DIM('Synthesizing question audio (out of band, not traced)...')}")
audio_clips = []
for q in questions:
audio_clips.append(synthesize_question_audio(q))
print(f" {DIM(f'Synthesized: ' + chr(34) + q + chr(34))}")
history = [
{
"role": "system",
"content": (
"You are a helpful voice assistant. Always answer in a single "
"spoken sentence. Never use bullet points, numbered lists, or "
"markdown. Spell out numbers and acronyms as words. Your reply "
"will be read aloud by a text-to-speech engine."
),
}
]
for i, (question, audio) in enumerate(zip(questions, audio_clips), start=1):
run_turn(question, audio, i, history)
print()
print("=" * 60)
print(f"{BOLD('Session complete:')} {BLUE(BOLD(SESSION_ID))}")
print(f"View this conversation in Langfuse -> Sessions -> {BLUE(BOLD(SESSION_ID))}")
# BatchSpanProcessor flushes asynchronously; block until exporter drains.
provider.shutdown()
sys.exit(0)
EOF