Full assembly
Full pipeline assembly
You have built each pipeline node in isolation: keyword detection in STT, structured output in LLM, emotional speech in TTS, and RAG injection in the turn handler. Now you will combine them into a single chain-of-thought agent that uses every override together, add tests for each node, and wire up metrics and logging. This is where the pieces become a product.
What you'll learn
- How to compose all node overrides into one cohesive Agent class
- How to avoid conflicts when multiple overrides interact
- How to test each node independently and the pipeline as a whole
- How to add metrics and structured logging for production observability
The complete chain-of-thought agent
Each node override calls Agent.default to run the standard behavior, then adds its custom logic around it. When you combine them, the order and interaction patterns matter.
import logging
import time
from livekit.agents import Agent, AgentSession, AgentServer, rtc_session
from livekit.plugins import openai, deepgram, cartesia
from prometheus_client import Counter, Histogram
logger = logging.getLogger("chain-of-thought")
# Metrics
STT_EVENTS = Counter("cot_stt_events_total", "STT events processed", ["type"])
LLM_CHUNKS = Counter("cot_llm_chunks_total", "LLM chunks generated")
TTS_AUDIO = Counter("cot_tts_audio_chunks_total", "TTS audio chunks produced")
TURN_LATENCY = Histogram("cot_turn_latency_seconds", "Turn processing latency")
class ChainOfThoughtAgent(Agent):
def __init__(self):
super().__init__(
instructions="""You are a thoughtful assistant that reasons
step by step. Be concise in speech but thorough in reasoning.""",
)
async def stt_node(self, audio):
"""Keyword detection from Ch2 — intercept urgent phrases."""
async for event in Agent.default.stt_node(self, audio):
STT_EVENTS.labels(type=event.type).inc()
# Flag urgent keywords for priority handling
if hasattr(event, "text") and event.text:
lower = event.text.lower()
if any(kw in lower for kw in ["emergency", "urgent", "help"]):
logger.info(f"Urgent keyword detected: {event.text}")
yield event
async def llm_node(self, chat_ctx, tools):
"""Structured output from Ch3 — parse chain-of-thought reasoning."""
async for chunk in Agent.default.llm_node(self, chat_ctx, tools):
LLM_CHUNKS.inc()
# Filter out internal reasoning markers from spoken output
if hasattr(chunk, "text") and chunk.text:
text = chunk.text
if "<think>" in text or "</think>" in text:
logger.debug(f"Filtering reasoning: {text}")
continue
yield chunk
async def tts_node(self, text):
"""Emotional speech from Ch4 — adjust voice parameters."""
TTS_AUDIO.inc()
async for audio in Agent.default.tts_node(self, text):
yield audio
async def on_user_turn_completed(self, turn_ctx):
"""RAG injection from Ch5 — enrich context before LLM runs."""
start = time.monotonic()
# Inject relevant context from knowledge base
user_text = turn_ctx.text
context_docs = await retrieve_relevant_docs(user_text)
if context_docs:
context_str = "\n".join(context_docs)
turn_ctx.add_system_message(
f"Relevant context:\n{context_str}"
)
logger.info(f"Injected {len(context_docs)} context documents")
await Agent.default.on_user_turn_completed(self, turn_ctx)
TURN_LATENCY.observe(time.monotonic() - start)
async def retrieve_relevant_docs(query: str) -> list[str]:
"""Retrieve documents from a vector store.
In production, replace the connection details with your actual
vector database (pgvector, Pinecone, Weaviate, etc.).
See Course 6.1 (RAG for Voice Agents) for full implementations.
"""
import asyncpg
import openai
client = openai.AsyncOpenAI()
# Generate embedding for the query
resp = await client.embeddings.create(
input=query, model="text-embedding-3-small"
)
embedding = resp.data[0].embedding
# Query pgvector for nearest neighbors
conn = await asyncpg.connect(dsn="postgresql://localhost/knowledge")
rows = await conn.fetch(
"SELECT content FROM documents ORDER BY embedding <=> $1 LIMIT 3",
str(embedding),
)
await conn.close()
return [row["content"] for row in rows]
server = AgentServer()
@server.rtc_session
async def entrypoint(session: AgentSession):
await session.start(
agent=ChainOfThoughtAgent(),
room=session.room,
stt=deepgram.STT(model="nova-3"),
llm=openai.LLM(model="gpt-4o"),
tts=cartesia.TTS(voice="<voice-id>"),
)
if __name__ == "__main__":
server.run()import { Agent, AgentServer, AgentSession } from "@livekit/agents";
import { OpenAI } from "@livekit/agents-plugin-openai";
import { Deepgram } from "@livekit/agents-plugin-deepgram";
class ChainOfThoughtAgent extends Agent {
constructor() {
super({
instructions: `You are a thoughtful assistant that reasons
step by step. Be concise in speech but thorough in reasoning.`,
});
}
override async *sttNode(audio: AsyncIterable<AudioFrame>) {
for await (const event of super.sttNode(audio)) {
if (event.text?.toLowerCase().includes("emergency")) {
console.log(`Urgent keyword detected: ${event.text}`);
}
yield event;
}
}
override async *llmNode(chatCtx: ChatContext, tools: Tool[]) {
for await (const chunk of super.llmNode(chatCtx, tools)) {
if (chunk.text?.includes("<think>")) continue;
yield chunk;
}
}
override async *ttsNode(text: string) {
for await (const audio of super.ttsNode(text)) {
yield audio;
}
}
override async onUserTurnCompleted(turnCtx: TurnContext) {
const docs = await retrieveRelevantDocs(turnCtx.text);
if (docs.length > 0) {
turnCtx.addSystemMessage(`Relevant context:\n${docs.join("\n")}`);
}
await super.onUserTurnCompleted(turnCtx);
}
}
async function retrieveRelevantDocs(query: string): Promise<string[]> {
return [`Relevant information for: ${query}`];
}
const server = new AgentServer();
server.rtcSession(async (session: AgentSession) => {
await session.start({
agent: new ChainOfThoughtAgent(),
room: session.room,
});
});
server.run();Every node override follows the same pattern: call the default implementation, add custom logic around it, and yield the results. This keeps each concern isolated while composing them into a unified pipeline. If one override causes issues, you can disable it by removing the override without touching the others.
Testing each node
Test nodes in isolation before testing the full pipeline. Mock the default implementation to verify your custom logic independently.
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from agent import ChainOfThoughtAgent
@pytest.mark.asyncio
async def test_stt_detects_urgent_keywords():
agent = ChainOfThoughtAgent()
mock_event = MagicMock()
mock_event.type = "final_transcript"
mock_event.text = "I need emergency help"
async def mock_stt(self, audio):
yield mock_event
with patch.object(type(agent).default, "stt_node", mock_stt):
events = []
async for event in agent.stt_node(iter([])):
events.append(event)
assert len(events) == 1
assert events[0].text == "I need emergency help"
@pytest.mark.asyncio
async def test_llm_filters_think_tags():
agent = ChainOfThoughtAgent()
visible_chunk = MagicMock(text="The answer is 42.")
hidden_chunk = MagicMock(text="<think>Let me reason...</think>")
async def mock_llm(self, chat_ctx, tools):
yield hidden_chunk
yield visible_chunk
with patch.object(type(agent).default, "llm_node", mock_llm):
chunks = []
async for chunk in agent.llm_node(MagicMock(), []):
chunks.append(chunk)
assert len(chunks) == 1
assert chunks[0].text == "The answer is 42."
@pytest.mark.asyncio
async def test_rag_injection():
agent = ChainOfThoughtAgent()
turn_ctx = MagicMock()
turn_ctx.text = "What is your return policy?"
turn_ctx.add_system_message = MagicMock()
with patch("agent.retrieve_relevant_docs", return_value=["Returns accepted within 30 days"]):
with patch.object(type(agent).default, "on_user_turn_completed", AsyncMock()):
await agent.on_user_turn_completed(turn_ctx)
turn_ctx.add_system_message.assert_called_once()
assert "30 days" in turn_ctx.add_system_message.call_args[0][0]Test the seams, not the providers
You are not testing whether Deepgram transcribes correctly or GPT-4o reasons well. You are testing that your overrides correctly intercept, transform, and pass through data. Mock the provider responses and verify your logic.
Structured logging
Metrics tell you what is happening in aggregate. Logs tell you what happened in a specific session. Use structured logging so your log aggregator can index and search efficiently.
import logging
import json
class StructuredFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": self.formatTime(record),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if hasattr(record, "session_id"):
log_data["session_id"] = record.session_id
if hasattr(record, "node"):
log_data["node"] = record.node
return json.dumps(log_data)
# Configure once at startup
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logging.getLogger("chain-of-thought").addHandler(handler)
logging.getLogger("chain-of-thought").setLevel(logging.INFO)Test your knowledge
Question 1 of 2
When combining multiple node overrides in a single agent, what pattern should every override follow?
Course summary
Over the course of this module, you have taken the voice pipeline apart and put it back together:
- Architecture: Understood the async generator pipeline from audio in to audio out
- STT node: Intercepted transcription events for keyword detection and filtering
- LLM node: Parsed structured output and chain-of-thought reasoning
- TTS node: Controlled voice parameters and emotional expression
- Turn handler: Injected RAG context before the LLM generates a response
- Full assembly: Combined every override into one agent with tests and metrics
Each node override is a focused, testable unit of behavior. Together, they give you fine-grained control over every stage of the voice conversation without abandoning the framework's defaults.
The pipeline node pattern is the primary extension mechanism for LiveKit voice agents. Mastering it means you can customize any aspect of the conversational experience while keeping the robust defaults for everything you do not need to change. This is the foundation that every advanced technique in later courses builds on.