The voice pipeline architecture
The voice pipeline architecture
Every voice agent you have built so far uses a pipeline that LiveKit wires together behind the scenes. In this chapter, you will crack open that pipeline, understand each node, and learn why every node is an async generator you can override to inject custom logic at any stage.
What you'll learn
- The full pipeline path from microphone audio to speaker output
- How each node is an async generator that yields streaming data
- How
Agent.defaultwires the default pipeline together - How data flows between nodes in a streaming fashion
The full pipeline
When a user speaks to your voice agent, their audio travels through a series of processing stages before a response reaches their ears. Here is the complete path:
Voice pipeline
Audio In
stt_node
on_user_turn_completed
llm_node
tts_node
Audio Out
Each stage is a node — a method on your Agent class that you can override. The pipeline is not a batch process. Data streams through each node as it becomes available: STT yields partial transcripts, the LLM yields tokens one at a time, and TTS begins synthesizing audio before the LLM finishes its response.
Audio In
Raw audio frames arrive from the user's microphone via WebRTC. LiveKit handles audio capture, encoding, and transport. By the time audio reaches your agent, it is a stream of PCM audio frames.
stt_node
The STT node receives the audio stream and yields speech events — interim transcripts, final transcripts, and speech boundary markers. You override this node to intercept, filter, or augment transcription results before they reach the LLM.
on_user_turn_completed
Once a complete user turn is detected (the user has finished speaking), this hook fires. It receives a turn context containing the user's message and the conversation history. This is where you inject RAG context, validate input, or decide whether to respond at all.
llm_node
The LLM node receives the chat context and yields response chunks as the model generates tokens. You override this node to parse structured output, add chain-of-thought reasoning, or transform the response format.
tts_node
The TTS node receives text and yields synthesized audio chunks. You override this node to change voice parameters, inject SSML, or route to different TTS providers based on content.
Audio Out
Synthesized audio frames are sent back to the user via WebRTC. LiveKit handles encoding, transport, and playback.
Think of the pipeline as a series of garden hoses connected end to end. Water (data) enters the first hose and flows through each one. An async generator is a hose that transforms the water as it passes through — it receives a stream on one end and produces a stream on the other. You can splice in a new hose at any point to filter, enrich, or redirect the flow without disrupting the rest of the chain.
Async generators: the streaming primitive
Every pipeline node is an async generator — a Python function (or TypeScript equivalent) that uses yield to produce values one at a time rather than returning a single result. This is what makes the pipeline truly streaming.
from livekit.agents import Agent, stt, llm, tts
import typing
class MyAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful assistant."
)
async def stt_node(
self, audio: stt.SpeechStream
) -> typing.AsyncGenerator[stt.SpeechEvent, None]:
"""Each speech event is yielded as it arrives — no buffering."""
async for event in Agent.default.stt_node(self, audio):
# You can inspect, modify, or filter each event here
yield event
async def llm_node(
self, chat_ctx: llm.ChatContext
) -> typing.AsyncGenerator[llm.ChatChunk, None]:
"""Each token chunk is yielded as the LLM generates it."""
async for chunk in Agent.default.llm_node(self, chat_ctx):
yield chunk
async def tts_node(
self, text: str
) -> typing.AsyncGenerator[tts.SynthesizedAudio, None]:
"""Each audio chunk is yielded as TTS synthesizes it."""
async for audio in Agent.default.tts_node(self, text):
yield audioimport { Agent, stt, llm, tts } from "@livekit/agents";
class MyAgent extends Agent {
constructor() {
super({ instructions: "You are a helpful assistant." });
}
async *sttNode(
audio: stt.SpeechStream
): AsyncGenerator<stt.SpeechEvent> {
for await (const event of Agent.default.sttNode(this, audio)) {
// Inspect, modify, or filter each event here
yield event;
}
}
async *llmNode(
chatCtx: llm.ChatContext
): AsyncGenerator<llm.ChatChunk> {
for await (const chunk of Agent.default.llmNode(this, chatCtx)) {
yield chunk;
}
}
async *ttsNode(
text: string
): AsyncGenerator<tts.SynthesizedAudio> {
for await (const audio of Agent.default.ttsNode(this, text)) {
yield audio;
}
}
}The pattern is always the same: call Agent.default.<node_name> to invoke the default implementation, iterate over its output, and yield each item — optionally transforming it along the way.
Async generators are lazy
An async generator does not execute until something iterates over it. The pipeline orchestrator pulls from each node on demand, which means data flows naturally without buffering. If the TTS node is slow to consume, the LLM node pauses automatically — backpressure is built in.
Agent.default: the escape hatch
Every node override follows a critical pattern: calling Agent.default to invoke the built-in behavior. This is not inheritance — it is explicit delegation.
class MyAgent(Agent):
async def stt_node(self, audio):
# Agent.default.stt_node runs the standard STT pipeline
async for event in Agent.default.stt_node(self, audio):
# Your custom logic wraps around the default
yield eventclass MyAgent extends Agent {
async *sttNode(audio: stt.SpeechStream) {
// Agent.default.sttNode runs the standard STT pipeline
for await (const event of Agent.default.sttNode(this, audio)) {
// Your custom logic wraps around the default
yield event;
}
}
}Agent.default gives you three options at every node:
- Wrap — call the default and add logic before or after each yielded item (most common)
- Replace — skip the default entirely and implement the node from scratch
- Conditionally delegate — call the default for some inputs and handle others yourself
Always pass self when calling Agent.default
In Python, Agent.default.stt_node(self, audio) requires the explicit self parameter because you are calling an unbound method on the default implementation. Forgetting self is the most common mistake when overriding nodes.
Streaming data flow between nodes
The real power of async generators is that the entire pipeline runs concurrently. The LLM does not wait for STT to finish processing all audio — it begins generating a response as soon as the first complete transcript arrives. TTS does not wait for the LLM to finish — it begins synthesizing audio from the first few tokens.
class StreamingAgent(Agent):
async def stt_node(self, audio):
async for event in Agent.default.stt_node(self, audio):
if event.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
transcript = event.alternatives[0].text
print(f"[STT] Final transcript: {transcript}")
yield event
async def llm_node(self, chat_ctx):
token_count = 0
async for chunk in Agent.default.llm_node(self, chat_ctx):
token_count += 1
if token_count == 1:
print("[LLM] First token received — TTS can begin")
yield chunk
print(f"[LLM] Complete: {token_count} tokens")
async def tts_node(self, text):
print(f"[TTS] Synthesizing: {text[:50]}...")
async for audio in Agent.default.tts_node(self, text):
yield audioclass StreamingAgent extends Agent {
async *sttNode(audio: stt.SpeechStream) {
for await (const event of Agent.default.sttNode(this, audio)) {
if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT) {
const transcript = event.alternatives[0].text;
console.log(`[STT] Final transcript: ${transcript}`);
}
yield event;
}
}
async *llmNode(chatCtx: llm.ChatContext) {
let tokenCount = 0;
for await (const chunk of Agent.default.llmNode(this, chatCtx)) {
tokenCount++;
if (tokenCount === 1) {
console.log("[LLM] First token received — TTS can begin");
}
yield chunk;
}
console.log(`[LLM] Complete: ${tokenCount} tokens`);
}
async *ttsNode(text: string) {
console.log(`[TTS] Synthesizing: ${text.slice(0, 50)}...`);
for await (const audio of Agent.default.ttsNode(this, text)) {
yield audio;
}
}
}When you run this agent, you will see the log lines interleave — STT produces a transcript, then LLM starts generating while STT continues listening, then TTS starts synthesizing while LLM continues generating. This concurrent streaming is what keeps voice AI latency low.
If the pipeline were synchronous — wait for all STT, then all LLM, then all TTS — the user would wait for the sum of all three stages before hearing anything. With streaming async generators, the user hears the first syllable of the response while the LLM is still generating the rest. This overlap is what makes sub-second response times possible.
Test your knowledge
Question 1 of 2
Why is the voice pipeline able to achieve sub-second response times despite having three separate models (STT, LLM, TTS) in sequence?
What you learned
- The voice pipeline follows a fixed path: Audio In, stt_node, on_user_turn_completed, llm_node, tts_node, Audio Out
- Each node is an async generator that yields streaming data
Agent.defaultlets you call the built-in implementation and wrap it with custom logic- Data flows concurrently through the pipeline — nodes do not wait for upstream nodes to finish
Next up
In the next chapter, you will override your first node: stt_node. You will intercept STT results to add keyword detection and build a profanity filter that flags transcripts before they reach the LLM.