Chapter 120m

The voice pipeline architecture

The voice pipeline architecture

Every voice agent you have built so far uses a pipeline that LiveKit wires together behind the scenes. In this chapter, you will crack open that pipeline, understand each node, and learn why every node is an async generator you can override to inject custom logic at any stage.

Pipeline diagramAsync generatorsAgent.default

What you'll learn

  • The full pipeline path from microphone audio to speaker output
  • How each node is an async generator that yields streaming data
  • How Agent.default wires the default pipeline together
  • How data flows between nodes in a streaming fashion

The full pipeline

When a user speaks to your voice agent, their audio travels through a series of processing stages before a response reaches their ears. Here is the complete path:

Voice pipeline

Audio In

stt_node

on_user_turn_completed

llm_node

tts_node

Audio Out

Each stage is a node — a method on your Agent class that you can override. The pipeline is not a batch process. Data streams through each node as it becomes available: STT yields partial transcripts, the LLM yields tokens one at a time, and TTS begins synthesizing audio before the LLM finishes its response.

1

Audio In

Raw audio frames arrive from the user's microphone via WebRTC. LiveKit handles audio capture, encoding, and transport. By the time audio reaches your agent, it is a stream of PCM audio frames.

2

stt_node

The STT node receives the audio stream and yields speech events — interim transcripts, final transcripts, and speech boundary markers. You override this node to intercept, filter, or augment transcription results before they reach the LLM.

3

on_user_turn_completed

Once a complete user turn is detected (the user has finished speaking), this hook fires. It receives a turn context containing the user's message and the conversation history. This is where you inject RAG context, validate input, or decide whether to respond at all.

4

llm_node

The LLM node receives the chat context and yields response chunks as the model generates tokens. You override this node to parse structured output, add chain-of-thought reasoning, or transform the response format.

5

tts_node

The TTS node receives text and yields synthesized audio chunks. You override this node to change voice parameters, inject SSML, or route to different TTS providers based on content.

6

Audio Out

Synthesized audio frames are sent back to the user via WebRTC. LiveKit handles encoding, transport, and playback.

What's happening

Think of the pipeline as a series of garden hoses connected end to end. Water (data) enters the first hose and flows through each one. An async generator is a hose that transforms the water as it passes through — it receives a stream on one end and produces a stream on the other. You can splice in a new hose at any point to filter, enrich, or redirect the flow without disrupting the rest of the chain.

Async generators: the streaming primitive

Every pipeline node is an async generator — a Python function (or TypeScript equivalent) that uses yield to produce values one at a time rather than returning a single result. This is what makes the pipeline truly streaming.

agent.pypython
from livekit.agents import Agent, stt, llm, tts
import typing


class MyAgent(Agent):
  def __init__(self):
      super().__init__(
          instructions="You are a helpful assistant."
      )

  async def stt_node(
      self, audio: stt.SpeechStream
  ) -> typing.AsyncGenerator[stt.SpeechEvent, None]:
      """Each speech event is yielded as it arrives — no buffering."""
      async for event in Agent.default.stt_node(self, audio):
          # You can inspect, modify, or filter each event here
          yield event

  async def llm_node(
      self, chat_ctx: llm.ChatContext
  ) -> typing.AsyncGenerator[llm.ChatChunk, None]:
      """Each token chunk is yielded as the LLM generates it."""
      async for chunk in Agent.default.llm_node(self, chat_ctx):
          yield chunk

  async def tts_node(
      self, text: str
  ) -> typing.AsyncGenerator[tts.SynthesizedAudio, None]:
      """Each audio chunk is yielded as TTS synthesizes it."""
      async for audio in Agent.default.tts_node(self, text):
          yield audio
agent.tstypescript
import { Agent, stt, llm, tts } from "@livekit/agents";

class MyAgent extends Agent {
constructor() {
  super({ instructions: "You are a helpful assistant." });
}

async *sttNode(
  audio: stt.SpeechStream
): AsyncGenerator<stt.SpeechEvent> {
  for await (const event of Agent.default.sttNode(this, audio)) {
    // Inspect, modify, or filter each event here
    yield event;
  }
}

async *llmNode(
  chatCtx: llm.ChatContext
): AsyncGenerator<llm.ChatChunk> {
  for await (const chunk of Agent.default.llmNode(this, chatCtx)) {
    yield chunk;
  }
}

async *ttsNode(
  text: string
): AsyncGenerator<tts.SynthesizedAudio> {
  for await (const audio of Agent.default.ttsNode(this, text)) {
    yield audio;
  }
}
}

The pattern is always the same: call Agent.default.<node_name> to invoke the default implementation, iterate over its output, and yield each item — optionally transforming it along the way.

Async generators are lazy

An async generator does not execute until something iterates over it. The pipeline orchestrator pulls from each node on demand, which means data flows naturally without buffering. If the TTS node is slow to consume, the LLM node pauses automatically — backpressure is built in.

Agent.default: the escape hatch

Every node override follows a critical pattern: calling Agent.default to invoke the built-in behavior. This is not inheritance — it is explicit delegation.

agent.pypython
class MyAgent(Agent):
  async def stt_node(self, audio):
      # Agent.default.stt_node runs the standard STT pipeline
      async for event in Agent.default.stt_node(self, audio):
          # Your custom logic wraps around the default
          yield event
agent.tstypescript
class MyAgent extends Agent {
async *sttNode(audio: stt.SpeechStream) {
  // Agent.default.sttNode runs the standard STT pipeline
  for await (const event of Agent.default.sttNode(this, audio)) {
    // Your custom logic wraps around the default
    yield event;
  }
}
}

Agent.default gives you three options at every node:

  1. Wrap — call the default and add logic before or after each yielded item (most common)
  2. Replace — skip the default entirely and implement the node from scratch
  3. Conditionally delegate — call the default for some inputs and handle others yourself

Always pass self when calling Agent.default

In Python, Agent.default.stt_node(self, audio) requires the explicit self parameter because you are calling an unbound method on the default implementation. Forgetting self is the most common mistake when overriding nodes.

Streaming data flow between nodes

The real power of async generators is that the entire pipeline runs concurrently. The LLM does not wait for STT to finish processing all audio — it begins generating a response as soon as the first complete transcript arrives. TTS does not wait for the LLM to finish — it begins synthesizing audio from the first few tokens.

agent.pypython
class StreamingAgent(Agent):
  async def stt_node(self, audio):
      async for event in Agent.default.stt_node(self, audio):
          if event.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
              transcript = event.alternatives[0].text
              print(f"[STT] Final transcript: {transcript}")
          yield event

  async def llm_node(self, chat_ctx):
      token_count = 0
      async for chunk in Agent.default.llm_node(self, chat_ctx):
          token_count += 1
          if token_count == 1:
              print("[LLM] First token received — TTS can begin")
          yield chunk
      print(f"[LLM] Complete: {token_count} tokens")

  async def tts_node(self, text):
      print(f"[TTS] Synthesizing: {text[:50]}...")
      async for audio in Agent.default.tts_node(self, text):
          yield audio
agent.tstypescript
class StreamingAgent extends Agent {
async *sttNode(audio: stt.SpeechStream) {
  for await (const event of Agent.default.sttNode(this, audio)) {
    if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT) {
      const transcript = event.alternatives[0].text;
      console.log(`[STT] Final transcript: ${transcript}`);
    }
    yield event;
  }
}

async *llmNode(chatCtx: llm.ChatContext) {
  let tokenCount = 0;
  for await (const chunk of Agent.default.llmNode(this, chatCtx)) {
    tokenCount++;
    if (tokenCount === 1) {
      console.log("[LLM] First token received — TTS can begin");
    }
    yield chunk;
  }
  console.log(`[LLM] Complete: ${tokenCount} tokens`);
}

async *ttsNode(text: string) {
  console.log(`[TTS] Synthesizing: ${text.slice(0, 50)}...`);
  for await (const audio of Agent.default.ttsNode(this, text)) {
    yield audio;
  }
}
}

When you run this agent, you will see the log lines interleave — STT produces a transcript, then LLM starts generating while STT continues listening, then TTS starts synthesizing while LLM continues generating. This concurrent streaming is what keeps voice AI latency low.

What's happening

If the pipeline were synchronous — wait for all STT, then all LLM, then all TTS — the user would wait for the sum of all three stages before hearing anything. With streaming async generators, the user hears the first syllable of the response while the LLM is still generating the rest. This overlap is what makes sub-second response times possible.

Test your knowledge

Question 1 of 2

Why is the voice pipeline able to achieve sub-second response times despite having three separate models (STT, LLM, TTS) in sequence?

What you learned

  • The voice pipeline follows a fixed path: Audio In, stt_node, on_user_turn_completed, llm_node, tts_node, Audio Out
  • Each node is an async generator that yields streaming data
  • Agent.default lets you call the built-in implementation and wrap it with custom logic
  • Data flows concurrently through the pipeline — nodes do not wait for upstream nodes to finish

Next up

In the next chapter, you will override your first node: stt_node. You will intercept STT results to add keyword detection and build a profanity filter that flags transcripts before they reach the LLM.

Concepts covered
Pipeline diagramAsync generatorsAgent.default