Chapter 525m

on_user_turn_completed: RAG injection

on_user_turn_completed: RAG injection

The on_user_turn_completed hook fires at a critical moment — after the user finishes speaking and the transcript is finalized, but before the LLM generates a response. This makes it the perfect place to inject context from external sources. In this chapter, you will use it to build a RAG (Retrieval-Augmented Generation) integration that searches a vector database and adds relevant knowledge to the conversation before the LLM sees it.

on_user_turn_completedturn_ctxStopResponseRAG

What you'll learn

  • How on_user_turn_completed differs from the streaming node overrides
  • How to use turn_ctx to access and modify the current turn
  • How to inject RAG context from a vector database before the LLM processes
  • How to use StopResponse to block the LLM from responding in certain cases

The on_user_turn_completed hook

Unlike stt_node, llm_node, and tts_node — which are async generators that yield streaming data — on_user_turn_completed is a regular async method. It fires once per user turn, after the user's speech has been fully transcribed.

agent.pypython
from livekit.agents import Agent


class MyAgent(Agent):
  async def on_user_turn_completed(self, turn_ctx):
      """Called after the user finishes speaking, before LLM processing."""
      # Access the user's message
      user_message = turn_ctx.user_message

      # Add context, modify the message, or block the response
      # ...

      # Call the default to continue the pipeline
      await Agent.default.on_user_turn_completed(self, turn_ctx)
agent.tstypescript
import { Agent } from "@livekit/agents";

class MyAgent extends Agent {
async onUserTurnCompleted(turnCtx: TurnContext) {
  // Access the user's message
  const userMessage = turnCtx.userMessage;

  // Add context, modify the message, or block the response
  // ...

  // Call the default to continue the pipeline
  await Agent.default.onUserTurnCompleted(this, turnCtx);
}
}

This is a hook, not a generator

Unlike the node methods, on_user_turn_completed does not yield data. It is called once, does its work, and then delegates to Agent.default.on_user_turn_completed to continue the pipeline. If you do not call the default, the LLM will not generate a response.

The turn_ctx object

The turn_ctx (turn context) gives you access to everything about the current conversational turn:

PropertyDescription
user_messageThe transcribed text of what the user said
chat_ctxThe full conversation history (all messages so far)
add_system_message()Inject a system message into the context for this turn

The add_system_message method is the key to RAG injection. It adds a system-level message to the conversation context that the LLM sees when generating its response, but it does not appear in the user-facing transcript.

Building a vector search integration

Here is a complete RAG agent that searches a vector database for relevant context and injects it before the LLM processes the user's message.

agent.pypython
from livekit.agents import Agent
import logging
import numpy as np

logger = logging.getLogger("rag-agent")


class RAGAgent(Agent):
  def __init__(self, knowledge_base):
      super().__init__(
          instructions="""You are a knowledgeable support assistant.
When context is provided, use it to answer accurately.
If the context does not contain the answer, say so honestly.
Do not make up information.""",
      )
      self.knowledge_base = knowledge_base

  async def on_user_turn_completed(self, turn_ctx):
      """Inject relevant context from the knowledge base."""
      user_message = turn_ctx.user_message
      logger.info(f"User message: {user_message}")

      # Search for relevant documents
      results = await self.search_knowledge_base(user_message)

      if results:
          # Format the context for injection
          context_text = self.format_context(results)
          logger.info(f"Injecting {len(results)} context chunks")

          # Add as a system message — LLM sees it, user does not
          turn_ctx.add_system_message(
              f"Relevant context from the knowledge base:\n\n{context_text}"
          )
      else:
          logger.info("No relevant context found")

      # Continue the pipeline
      await Agent.default.on_user_turn_completed(self, turn_ctx)

  async def search_knowledge_base(
      self, query: str, top_k: int = 3, threshold: float = 0.7
  ) -> list[dict]:
      """Search the vector database for relevant documents."""
      # Generate embedding for the query
      query_embedding = await self.knowledge_base.embed(query)

      # Search for similar documents
      results = await self.knowledge_base.search(
          embedding=query_embedding,
          top_k=top_k,
      )

      # Filter by similarity threshold
      return [r for r in results if r["score"] >= threshold]

  def format_context(self, results: list[dict]) -> str:
      """Format search results into a context string."""
      chunks = []
      for i, result in enumerate(results, 1):
          source = result.get("source", "unknown")
          text = result["text"]
          score = result["score"]
          chunks.append(
              f"[Source {i}: {source} (relevance: {score:.2f})]\n{text}"
          )
      return "\n\n".join(chunks)
agent.tstypescript
import { Agent } from "@livekit/agents";

class RAGAgent extends Agent {
private knowledgeBase: KnowledgeBase;

constructor(knowledgeBase: KnowledgeBase) {
  super({
    instructions: `You are a knowledgeable support assistant.
When context is provided, use it to answer accurately.
If the context does not contain the answer, say so honestly.
Do not make up information.`,
  });
  this.knowledgeBase = knowledgeBase;
}

async onUserTurnCompleted(turnCtx: TurnContext) {
  const userMessage = turnCtx.userMessage;
  console.log(`User message: ${userMessage}`);

  const results = await this.searchKnowledgeBase(userMessage);

  if (results.length > 0) {
    const contextText = this.formatContext(results);
    console.log(`Injecting ${results.length} context chunks`);

    turnCtx.addSystemMessage(
      `Relevant context from the knowledge base:\n\n${contextText}`
    );
  } else {
    console.log("No relevant context found");
  }

  await Agent.default.onUserTurnCompleted(this, turnCtx);
}

private async searchKnowledgeBase(
  query: string,
  topK = 3,
  threshold = 0.7
): Promise<SearchResult[]> {
  const queryEmbedding = await this.knowledgeBase.embed(query);

  const results = await this.knowledgeBase.search({
    embedding: queryEmbedding,
    topK,
  });

  return results.filter((r) => r.score >= threshold);
}

private formatContext(results: SearchResult[]): string {
  return results
    .map(
      (r, i) =>
        `[Source ${i + 1}: ${r.source ?? "unknown"} (relevance: ${r.score.toFixed(2)})]\n${r.text}`
    )
    .join("\n\n");
}
}
What's happening

RAG injection at the on_user_turn_completed stage is the most natural place for it. The user has finished speaking, so you have the complete query to search with. The LLM has not started generating yet, so the context arrives in time. And because you use add_system_message, the context appears as authoritative knowledge — not as a user message the LLM might misinterpret.

Connecting to a real vector database

The knowledge_base object in the example above is an abstraction. Here is a concrete implementation using a common pattern with an embedding API and a vector store:

knowledge_base.pypython
from livekit.plugins import openai
import aiohttp


class KnowledgeBase:
  def __init__(self, vector_store_url: str):
      self.vector_store_url = vector_store_url
      self.embedder = openai.Embeddings(model="text-embedding-3-small")

  async def embed(self, text: str) -> list[float]:
      """Generate an embedding vector for the given text."""
      result = await self.embedder.embed(text)
      return result

  async def search(
      self, embedding: list[float], top_k: int = 3
  ) -> list[dict]:
      """Search the vector store for similar documents."""
      async with aiohttp.ClientSession() as session:
          async with session.post(
              f"{self.vector_store_url}/search",
              json={"embedding": embedding, "top_k": top_k},
          ) as resp:
              data = await resp.json()
              return data["results"]
knowledge-base.tstypescript
import { openai } from "@livekit/plugins-openai";

class KnowledgeBase {
private vectorStoreUrl: string;
private embedder: openai.Embeddings;

constructor(vectorStoreUrl: string) {
  this.vectorStoreUrl = vectorStoreUrl;
  this.embedder = new openai.Embeddings({ model: "text-embedding-3-small" });
}

async embed(text: string): Promise<number[]> {
  return await this.embedder.embed(text);
}

async search(params: {
  embedding: number[];
  topK: number;
}): Promise<SearchResult[]> {
  const resp = await fetch(`${this.vectorStoreUrl}/search`, {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({
      embedding: params.embedding,
      top_k: params.topK,
    }),
  });
  const data = await resp.json();
  return data.results;
}
}

Using StopResponse to block responses

Sometimes you want to prevent the LLM from responding at all. The StopResponse exception lets you halt the pipeline in on_user_turn_completed.

agent.pypython
from livekit.agents import Agent, StopResponse
import logging

logger = logging.getLogger("gated-agent")


class GatedRAGAgent(Agent):
  """Only responds when relevant context is found."""

  def __init__(self, knowledge_base):
      super().__init__(
          instructions="You are a product support assistant. Only answer questions about our product.",
      )
      self.knowledge_base = knowledge_base

  async def on_user_turn_completed(self, turn_ctx):
      user_message = turn_ctx.user_message

      results = await self.search_knowledge_base(user_message)

      if not results:
          logger.info(f"No context found for: {user_message}")
          # Option 1: Stop the response entirely
          # raise StopResponse()

          # Option 2: Let the LLM respond but without context
          turn_ctx.add_system_message(
              "No relevant context was found in the knowledge base. "
              "Politely let the user know you can only answer questions "
              "about our product, and suggest they rephrase their question."
          )
      else:
          context_text = self.format_context(results)
          turn_ctx.add_system_message(
              f"Relevant context:\n\n{context_text}"
          )

      await Agent.default.on_user_turn_completed(self, turn_ctx)
agent.tstypescript
import { Agent, StopResponse } from "@livekit/agents";

class GatedRAGAgent extends Agent {
private knowledgeBase: KnowledgeBase;

constructor(knowledgeBase: KnowledgeBase) {
  super({
    instructions:
      "You are a product support assistant. Only answer questions about our product.",
  });
  this.knowledgeBase = knowledgeBase;
}

async onUserTurnCompleted(turnCtx: TurnContext) {
  const userMessage = turnCtx.userMessage;
  const results = await this.searchKnowledgeBase(userMessage);

  if (results.length === 0) {
    console.log(`No context found for: ${userMessage}`);
    // Option 1: Stop the response entirely
    // throw new StopResponse();

    // Option 2: Let the LLM respond but without context
    turnCtx.addSystemMessage(
      "No relevant context was found in the knowledge base. " +
        "Politely let the user know you can only answer questions " +
        "about our product, and suggest they rephrase their question."
    );
  } else {
    const contextText = this.formatContext(results);
    turnCtx.addSystemMessage(`Relevant context:\n\n${contextText}`);
  }

  await Agent.default.onUserTurnCompleted(this, turnCtx);
}
}
1

Search for relevant context

When the user finishes speaking, embed their message and search the vector store for matching documents.

2

Decide whether to respond

If no relevant context is found, you can either raise StopResponse to silence the agent, or inject a system message that tells the LLM to redirect the conversation.

3

Inject context as a system message

When relevant context exists, format it and inject it via add_system_message. The LLM sees it as authoritative context, not user input.

4

Continue the pipeline

Call Agent.default.on_user_turn_completed to hand off to the LLM. If you raised StopResponse, the pipeline halts and no response is generated.

StopResponse is silent

When you raise StopResponse, the user hears nothing. No acknowledgment, no error message, just silence. For a better user experience, consider injecting a redirect message instead of stopping entirely. Reserve StopResponse for cases where responding would be actively harmful — like when the user triggers a content filter.

Test your knowledge

Question 1 of 3

Why is on_user_turn_completed the ideal place for RAG injection rather than stt_node or llm_node?

What you learned

  • on_user_turn_completed is a regular async method, not an async generator
  • turn_ctx provides access to the user's message and the ability to inject system messages
  • RAG injection works by embedding the user's query, searching a vector store, and adding results as system context
  • StopResponse halts the pipeline entirely, preventing the LLM from generating a response

Next up

In the next chapter, you will build content filters at multiple stages of the pipeline — from simple keyword blocklists to LLM-powered content moderation that catches nuanced violations.

Concepts covered
on_user_turn_completedturn_ctxStopResponseRAG