Chapter 820m

Full assembly

Full pipeline assembly

You have built each pipeline node in isolation: keyword detection in STT, structured output in LLM, emotional speech in TTS, and RAG injection in the turn handler. Now you will combine them into a single chain-of-thought agent that uses every override together, add tests for each node, and wire up metrics and logging. This is where the pieces become a product.

Complete pipelineTestsMetrics

What you'll learn

  • How to compose all node overrides into one cohesive Agent class
  • How to avoid conflicts when multiple overrides interact
  • How to test each node independently and the pipeline as a whole
  • How to add metrics and structured logging for production observability

The complete chain-of-thought agent

Each node override calls Agent.default to run the standard behavior, then adds its custom logic around it. When you combine them, the order and interaction patterns matter.

agent.pypython
import logging
import time
from livekit.agents import Agent, AgentSession, AgentServer, rtc_session
from livekit.plugins import openai, deepgram, cartesia
from prometheus_client import Counter, Histogram

logger = logging.getLogger("chain-of-thought")

# Metrics
STT_EVENTS = Counter("cot_stt_events_total", "STT events processed", ["type"])
LLM_CHUNKS = Counter("cot_llm_chunks_total", "LLM chunks generated")
TTS_AUDIO = Counter("cot_tts_audio_chunks_total", "TTS audio chunks produced")
TURN_LATENCY = Histogram("cot_turn_latency_seconds", "Turn processing latency")


class ChainOfThoughtAgent(Agent):
  def __init__(self):
      super().__init__(
          instructions="""You are a thoughtful assistant that reasons
          step by step. Be concise in speech but thorough in reasoning.""",
      )

  async def stt_node(self, audio):
      """Keyword detection from Ch2 — intercept urgent phrases."""
      async for event in Agent.default.stt_node(self, audio):
          STT_EVENTS.labels(type=event.type).inc()

          # Flag urgent keywords for priority handling
          if hasattr(event, "text") and event.text:
              lower = event.text.lower()
              if any(kw in lower for kw in ["emergency", "urgent", "help"]):
                  logger.info(f"Urgent keyword detected: {event.text}")
          yield event

  async def llm_node(self, chat_ctx, tools):
      """Structured output from Ch3 — parse chain-of-thought reasoning."""
      async for chunk in Agent.default.llm_node(self, chat_ctx, tools):
          LLM_CHUNKS.inc()

          # Filter out internal reasoning markers from spoken output
          if hasattr(chunk, "text") and chunk.text:
              text = chunk.text
              if "<think>" in text or "</think>" in text:
                  logger.debug(f"Filtering reasoning: {text}")
                  continue
          yield chunk

  async def tts_node(self, text):
      """Emotional speech from Ch4 — adjust voice parameters."""
      TTS_AUDIO.inc()
      async for audio in Agent.default.tts_node(self, text):
          yield audio

  async def on_user_turn_completed(self, turn_ctx):
      """RAG injection from Ch5 — enrich context before LLM runs."""
      start = time.monotonic()

      # Inject relevant context from knowledge base
      user_text = turn_ctx.text
      context_docs = await retrieve_relevant_docs(user_text)
      if context_docs:
          context_str = "\n".join(context_docs)
          turn_ctx.add_system_message(
              f"Relevant context:\n{context_str}"
          )
          logger.info(f"Injected {len(context_docs)} context documents")

      await Agent.default.on_user_turn_completed(self, turn_ctx)
      TURN_LATENCY.observe(time.monotonic() - start)


async def retrieve_relevant_docs(query: str) -> list[str]:
  """Retrieve documents from a vector store.

  In production, replace the connection details with your actual
  vector database (pgvector, Pinecone, Weaviate, etc.).
  See Course 6.1 (RAG for Voice Agents) for full implementations.
  """
  import asyncpg
  import openai

  client = openai.AsyncOpenAI()

  # Generate embedding for the query
  resp = await client.embeddings.create(
      input=query, model="text-embedding-3-small"
  )
  embedding = resp.data[0].embedding

  # Query pgvector for nearest neighbors
  conn = await asyncpg.connect(dsn="postgresql://localhost/knowledge")
  rows = await conn.fetch(
      "SELECT content FROM documents ORDER BY embedding <=> $1 LIMIT 3",
      str(embedding),
  )
  await conn.close()

  return [row["content"] for row in rows]


server = AgentServer()


@server.rtc_session
async def entrypoint(session: AgentSession):
  await session.start(
      agent=ChainOfThoughtAgent(),
      room=session.room,
      stt=deepgram.STT(model="nova-3"),
      llm=openai.LLM(model="gpt-4o"),
      tts=cartesia.TTS(voice="<voice-id>"),
  )


if __name__ == "__main__":
  server.run()
agent.tstypescript
import { Agent, AgentServer, AgentSession } from "@livekit/agents";
import { OpenAI } from "@livekit/agents-plugin-openai";
import { Deepgram } from "@livekit/agents-plugin-deepgram";

class ChainOfThoughtAgent extends Agent {
constructor() {
  super({
    instructions: `You are a thoughtful assistant that reasons
      step by step. Be concise in speech but thorough in reasoning.`,
  });
}

override async *sttNode(audio: AsyncIterable<AudioFrame>) {
  for await (const event of super.sttNode(audio)) {
    if (event.text?.toLowerCase().includes("emergency")) {
      console.log(`Urgent keyword detected: ${event.text}`);
    }
    yield event;
  }
}

override async *llmNode(chatCtx: ChatContext, tools: Tool[]) {
  for await (const chunk of super.llmNode(chatCtx, tools)) {
    if (chunk.text?.includes("<think>")) continue;
    yield chunk;
  }
}

override async *ttsNode(text: string) {
  for await (const audio of super.ttsNode(text)) {
    yield audio;
  }
}

override async onUserTurnCompleted(turnCtx: TurnContext) {
  const docs = await retrieveRelevantDocs(turnCtx.text);
  if (docs.length > 0) {
    turnCtx.addSystemMessage(`Relevant context:\n${docs.join("\n")}`);
  }
  await super.onUserTurnCompleted(turnCtx);
}
}

async function retrieveRelevantDocs(query: string): Promise<string[]> {
return [`Relevant information for: ${query}`];
}

const server = new AgentServer();

server.rtcSession(async (session: AgentSession) => {
await session.start({
  agent: new ChainOfThoughtAgent(),
  room: session.room,
});
});

server.run();
What's happening

Every node override follows the same pattern: call the default implementation, add custom logic around it, and yield the results. This keeps each concern isolated while composing them into a unified pipeline. If one override causes issues, you can disable it by removing the override without touching the others.

Testing each node

Test nodes in isolation before testing the full pipeline. Mock the default implementation to verify your custom logic independently.

tests/test_pipeline.pypython
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from agent import ChainOfThoughtAgent


@pytest.mark.asyncio
async def test_stt_detects_urgent_keywords():
  agent = ChainOfThoughtAgent()
  mock_event = MagicMock()
  mock_event.type = "final_transcript"
  mock_event.text = "I need emergency help"

  async def mock_stt(self, audio):
      yield mock_event

  with patch.object(type(agent).default, "stt_node", mock_stt):
      events = []
      async for event in agent.stt_node(iter([])):
          events.append(event)

  assert len(events) == 1
  assert events[0].text == "I need emergency help"


@pytest.mark.asyncio
async def test_llm_filters_think_tags():
  agent = ChainOfThoughtAgent()
  visible_chunk = MagicMock(text="The answer is 42.")
  hidden_chunk = MagicMock(text="<think>Let me reason...</think>")

  async def mock_llm(self, chat_ctx, tools):
      yield hidden_chunk
      yield visible_chunk

  with patch.object(type(agent).default, "llm_node", mock_llm):
      chunks = []
      async for chunk in agent.llm_node(MagicMock(), []):
          chunks.append(chunk)

  assert len(chunks) == 1
  assert chunks[0].text == "The answer is 42."


@pytest.mark.asyncio
async def test_rag_injection():
  agent = ChainOfThoughtAgent()
  turn_ctx = MagicMock()
  turn_ctx.text = "What is your return policy?"
  turn_ctx.add_system_message = MagicMock()

  with patch("agent.retrieve_relevant_docs", return_value=["Returns accepted within 30 days"]):
      with patch.object(type(agent).default, "on_user_turn_completed", AsyncMock()):
          await agent.on_user_turn_completed(turn_ctx)

  turn_ctx.add_system_message.assert_called_once()
  assert "30 days" in turn_ctx.add_system_message.call_args[0][0]

Test the seams, not the providers

You are not testing whether Deepgram transcribes correctly or GPT-4o reasons well. You are testing that your overrides correctly intercept, transform, and pass through data. Mock the provider responses and verify your logic.

Structured logging

Metrics tell you what is happening in aggregate. Logs tell you what happened in a specific session. Use structured logging so your log aggregator can index and search efficiently.

logging_config.pypython
import logging
import json

class StructuredFormatter(logging.Formatter):
  def format(self, record):
      log_data = {
          "timestamp": self.formatTime(record),
          "level": record.levelname,
          "logger": record.name,
          "message": record.getMessage(),
      }
      if hasattr(record, "session_id"):
          log_data["session_id"] = record.session_id
      if hasattr(record, "node"):
          log_data["node"] = record.node
      return json.dumps(log_data)

# Configure once at startup
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logging.getLogger("chain-of-thought").addHandler(handler)
logging.getLogger("chain-of-thought").setLevel(logging.INFO)

Test your knowledge

Question 1 of 2

When combining multiple node overrides in a single agent, what pattern should every override follow?

Course summary

Over the course of this module, you have taken the voice pipeline apart and put it back together:

  • Architecture: Understood the async generator pipeline from audio in to audio out
  • STT node: Intercepted transcription events for keyword detection and filtering
  • LLM node: Parsed structured output and chain-of-thought reasoning
  • TTS node: Controlled voice parameters and emotional expression
  • Turn handler: Injected RAG context before the LLM generates a response
  • Full assembly: Combined every override into one agent with tests and metrics

Each node override is a focused, testable unit of behavior. Together, they give you fine-grained control over every stage of the voice conversation without abandoning the framework's defaults.

What's happening

The pipeline node pattern is the primary extension mechanism for LiveKit voice agents. Mastering it means you can customize any aspect of the conversational experience while keeping the robust defaults for everything you do not need to change. This is the foundation that every advanced technique in later courses builds on.

Concepts covered
Complete pipelineTestsMetrics