on_user_turn_completed: RAG injection
on_user_turn_completed: RAG injection
The on_user_turn_completed hook fires at a critical moment — after the user finishes speaking and the transcript is finalized, but before the LLM generates a response. This makes it the perfect place to inject context from external sources. In this chapter, you will use it to build a RAG (Retrieval-Augmented Generation) integration that searches a vector database and adds relevant knowledge to the conversation before the LLM sees it.
What you'll learn
- How
on_user_turn_completeddiffers from the streaming node overrides - How to use
turn_ctxto access and modify the current turn - How to inject RAG context from a vector database before the LLM processes
- How to use
StopResponseto block the LLM from responding in certain cases
The on_user_turn_completed hook
Unlike stt_node, llm_node, and tts_node — which are async generators that yield streaming data — on_user_turn_completed is a regular async method. It fires once per user turn, after the user's speech has been fully transcribed.
from livekit.agents import Agent
class MyAgent(Agent):
async def on_user_turn_completed(self, turn_ctx):
"""Called after the user finishes speaking, before LLM processing."""
# Access the user's message
user_message = turn_ctx.user_message
# Add context, modify the message, or block the response
# ...
# Call the default to continue the pipeline
await Agent.default.on_user_turn_completed(self, turn_ctx)import { Agent } from "@livekit/agents";
class MyAgent extends Agent {
async onUserTurnCompleted(turnCtx: TurnContext) {
// Access the user's message
const userMessage = turnCtx.userMessage;
// Add context, modify the message, or block the response
// ...
// Call the default to continue the pipeline
await Agent.default.onUserTurnCompleted(this, turnCtx);
}
}This is a hook, not a generator
Unlike the node methods, on_user_turn_completed does not yield data. It is called once, does its work, and then delegates to Agent.default.on_user_turn_completed to continue the pipeline. If you do not call the default, the LLM will not generate a response.
The turn_ctx object
The turn_ctx (turn context) gives you access to everything about the current conversational turn:
| Property | Description |
|---|---|
user_message | The transcribed text of what the user said |
chat_ctx | The full conversation history (all messages so far) |
add_system_message() | Inject a system message into the context for this turn |
The add_system_message method is the key to RAG injection. It adds a system-level message to the conversation context that the LLM sees when generating its response, but it does not appear in the user-facing transcript.
Building a vector search integration
Here is a complete RAG agent that searches a vector database for relevant context and injects it before the LLM processes the user's message.
from livekit.agents import Agent
import logging
import numpy as np
logger = logging.getLogger("rag-agent")
class RAGAgent(Agent):
def __init__(self, knowledge_base):
super().__init__(
instructions="""You are a knowledgeable support assistant.
When context is provided, use it to answer accurately.
If the context does not contain the answer, say so honestly.
Do not make up information.""",
)
self.knowledge_base = knowledge_base
async def on_user_turn_completed(self, turn_ctx):
"""Inject relevant context from the knowledge base."""
user_message = turn_ctx.user_message
logger.info(f"User message: {user_message}")
# Search for relevant documents
results = await self.search_knowledge_base(user_message)
if results:
# Format the context for injection
context_text = self.format_context(results)
logger.info(f"Injecting {len(results)} context chunks")
# Add as a system message — LLM sees it, user does not
turn_ctx.add_system_message(
f"Relevant context from the knowledge base:\n\n{context_text}"
)
else:
logger.info("No relevant context found")
# Continue the pipeline
await Agent.default.on_user_turn_completed(self, turn_ctx)
async def search_knowledge_base(
self, query: str, top_k: int = 3, threshold: float = 0.7
) -> list[dict]:
"""Search the vector database for relevant documents."""
# Generate embedding for the query
query_embedding = await self.knowledge_base.embed(query)
# Search for similar documents
results = await self.knowledge_base.search(
embedding=query_embedding,
top_k=top_k,
)
# Filter by similarity threshold
return [r for r in results if r["score"] >= threshold]
def format_context(self, results: list[dict]) -> str:
"""Format search results into a context string."""
chunks = []
for i, result in enumerate(results, 1):
source = result.get("source", "unknown")
text = result["text"]
score = result["score"]
chunks.append(
f"[Source {i}: {source} (relevance: {score:.2f})]\n{text}"
)
return "\n\n".join(chunks)import { Agent } from "@livekit/agents";
class RAGAgent extends Agent {
private knowledgeBase: KnowledgeBase;
constructor(knowledgeBase: KnowledgeBase) {
super({
instructions: `You are a knowledgeable support assistant.
When context is provided, use it to answer accurately.
If the context does not contain the answer, say so honestly.
Do not make up information.`,
});
this.knowledgeBase = knowledgeBase;
}
async onUserTurnCompleted(turnCtx: TurnContext) {
const userMessage = turnCtx.userMessage;
console.log(`User message: ${userMessage}`);
const results = await this.searchKnowledgeBase(userMessage);
if (results.length > 0) {
const contextText = this.formatContext(results);
console.log(`Injecting ${results.length} context chunks`);
turnCtx.addSystemMessage(
`Relevant context from the knowledge base:\n\n${contextText}`
);
} else {
console.log("No relevant context found");
}
await Agent.default.onUserTurnCompleted(this, turnCtx);
}
private async searchKnowledgeBase(
query: string,
topK = 3,
threshold = 0.7
): Promise<SearchResult[]> {
const queryEmbedding = await this.knowledgeBase.embed(query);
const results = await this.knowledgeBase.search({
embedding: queryEmbedding,
topK,
});
return results.filter((r) => r.score >= threshold);
}
private formatContext(results: SearchResult[]): string {
return results
.map(
(r, i) =>
`[Source ${i + 1}: ${r.source ?? "unknown"} (relevance: ${r.score.toFixed(2)})]\n${r.text}`
)
.join("\n\n");
}
}RAG injection at the on_user_turn_completed stage is the most natural place for it. The user has finished speaking, so you have the complete query to search with. The LLM has not started generating yet, so the context arrives in time. And because you use add_system_message, the context appears as authoritative knowledge — not as a user message the LLM might misinterpret.
Connecting to a real vector database
The knowledge_base object in the example above is an abstraction. Here is a concrete implementation using a common pattern with an embedding API and a vector store:
from livekit.plugins import openai
import aiohttp
class KnowledgeBase:
def __init__(self, vector_store_url: str):
self.vector_store_url = vector_store_url
self.embedder = openai.Embeddings(model="text-embedding-3-small")
async def embed(self, text: str) -> list[float]:
"""Generate an embedding vector for the given text."""
result = await self.embedder.embed(text)
return result
async def search(
self, embedding: list[float], top_k: int = 3
) -> list[dict]:
"""Search the vector store for similar documents."""
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.vector_store_url}/search",
json={"embedding": embedding, "top_k": top_k},
) as resp:
data = await resp.json()
return data["results"]import { openai } from "@livekit/plugins-openai";
class KnowledgeBase {
private vectorStoreUrl: string;
private embedder: openai.Embeddings;
constructor(vectorStoreUrl: string) {
this.vectorStoreUrl = vectorStoreUrl;
this.embedder = new openai.Embeddings({ model: "text-embedding-3-small" });
}
async embed(text: string): Promise<number[]> {
return await this.embedder.embed(text);
}
async search(params: {
embedding: number[];
topK: number;
}): Promise<SearchResult[]> {
const resp = await fetch(`${this.vectorStoreUrl}/search`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
embedding: params.embedding,
top_k: params.topK,
}),
});
const data = await resp.json();
return data.results;
}
}Using StopResponse to block responses
Sometimes you want to prevent the LLM from responding at all. The StopResponse exception lets you halt the pipeline in on_user_turn_completed.
from livekit.agents import Agent, StopResponse
import logging
logger = logging.getLogger("gated-agent")
class GatedRAGAgent(Agent):
"""Only responds when relevant context is found."""
def __init__(self, knowledge_base):
super().__init__(
instructions="You are a product support assistant. Only answer questions about our product.",
)
self.knowledge_base = knowledge_base
async def on_user_turn_completed(self, turn_ctx):
user_message = turn_ctx.user_message
results = await self.search_knowledge_base(user_message)
if not results:
logger.info(f"No context found for: {user_message}")
# Option 1: Stop the response entirely
# raise StopResponse()
# Option 2: Let the LLM respond but without context
turn_ctx.add_system_message(
"No relevant context was found in the knowledge base. "
"Politely let the user know you can only answer questions "
"about our product, and suggest they rephrase their question."
)
else:
context_text = self.format_context(results)
turn_ctx.add_system_message(
f"Relevant context:\n\n{context_text}"
)
await Agent.default.on_user_turn_completed(self, turn_ctx)import { Agent, StopResponse } from "@livekit/agents";
class GatedRAGAgent extends Agent {
private knowledgeBase: KnowledgeBase;
constructor(knowledgeBase: KnowledgeBase) {
super({
instructions:
"You are a product support assistant. Only answer questions about our product.",
});
this.knowledgeBase = knowledgeBase;
}
async onUserTurnCompleted(turnCtx: TurnContext) {
const userMessage = turnCtx.userMessage;
const results = await this.searchKnowledgeBase(userMessage);
if (results.length === 0) {
console.log(`No context found for: ${userMessage}`);
// Option 1: Stop the response entirely
// throw new StopResponse();
// Option 2: Let the LLM respond but without context
turnCtx.addSystemMessage(
"No relevant context was found in the knowledge base. " +
"Politely let the user know you can only answer questions " +
"about our product, and suggest they rephrase their question."
);
} else {
const contextText = this.formatContext(results);
turnCtx.addSystemMessage(`Relevant context:\n\n${contextText}`);
}
await Agent.default.onUserTurnCompleted(this, turnCtx);
}
}Search for relevant context
When the user finishes speaking, embed their message and search the vector store for matching documents.
Decide whether to respond
If no relevant context is found, you can either raise StopResponse to silence the agent, or inject a system message that tells the LLM to redirect the conversation.
Inject context as a system message
When relevant context exists, format it and inject it via add_system_message. The LLM sees it as authoritative context, not user input.
Continue the pipeline
Call Agent.default.on_user_turn_completed to hand off to the LLM. If you raised StopResponse, the pipeline halts and no response is generated.
StopResponse is silent
When you raise StopResponse, the user hears nothing. No acknowledgment, no error message, just silence. For a better user experience, consider injecting a redirect message instead of stopping entirely. Reserve StopResponse for cases where responding would be actively harmful — like when the user triggers a content filter.
Test your knowledge
Question 1 of 3
Why is on_user_turn_completed the ideal place for RAG injection rather than stt_node or llm_node?
What you learned
on_user_turn_completedis a regular async method, not an async generatorturn_ctxprovides access to the user's message and the ability to inject system messages- RAG injection works by embedding the user's query, searching a vector store, and adding results as system context
StopResponsehalts the pipeline entirely, preventing the LLM from generating a response
Next up
In the next chapter, you will build content filters at multiple stages of the pipeline — from simple keyword blocklists to LLM-powered content moderation that catches nuanced violations.