stt_node: audio preprocessing
stt_node: audio preprocessing
The stt_node is the first override point in the voice pipeline. Every word the user speaks passes through it, which makes it the ideal place to intercept, filter, or augment transcripts before the LLM ever sees them. In this chapter, you will override stt_node to add keyword detection and build a profanity filter.
What you'll learn
- The async generator signature of
stt_nodeand the events it yields - How to intercept and inspect STT results in the pipeline
- How to build keyword detection for trigger phrases
- How to build a profanity detector that flags or blocks transcripts
The stt_node signature
The stt_node receives a SpeechStream (a stream of audio data) and yields SpeechEvent objects. Each event represents something the STT engine detected: an interim transcript, a final transcript, or a speech boundary marker like the start or end of speech.
from livekit.agents import Agent, stt
import typing
class MyAgent(Agent):
async def stt_node(
self, audio: stt.SpeechStream
) -> typing.AsyncGenerator[stt.SpeechEvent, None]:
async for event in Agent.default.stt_node(self, audio):
yield eventimport { Agent, stt } from "@livekit/agents";
class MyAgent extends Agent {
async *sttNode(
audio: stt.SpeechStream
): AsyncGenerator<stt.SpeechEvent> {
for await (const event of Agent.default.sttNode(this, audio)) {
yield event;
}
}
}The key event types you will work with:
| Event Type | Description |
|---|---|
INTERIM_TRANSCRIPT | Partial transcript that may change as more audio arrives |
FINAL_TRANSCRIPT | Confirmed transcript — the STT engine is confident in this text |
SPEECH_STARTED | The user began speaking |
END_OF_SPEECH | The user stopped speaking |
Final vs interim transcripts
Interim transcripts update rapidly as the user speaks and frequently change. Final transcripts are stable — the STT engine has committed to them. Always use FINAL_TRANSCRIPT events for logic that makes decisions based on what the user said. Interim transcripts are useful for UI feedback but unreliable for filtering.
Keyword detection
A common use case is detecting specific trigger phrases and taking action before the LLM processes the message. For example, you might want to detect when a user says "transfer me to a human" and handle it with custom logic.
from livekit.agents import Agent, stt
import typing
import logging
logger = logging.getLogger("keyword-agent")
TRIGGER_PHRASES = [
"transfer me",
"speak to a human",
"real person",
"cancel my account",
]
class KeywordAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful customer service assistant."
)
async def stt_node(
self, audio: stt.SpeechStream
) -> typing.AsyncGenerator[stt.SpeechEvent, None]:
async for event in Agent.default.stt_node(self, audio):
if event.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
transcript = event.alternatives[0].text.lower()
for phrase in TRIGGER_PHRASES:
if phrase in transcript:
logger.info(
f"Trigger phrase detected: '{phrase}' "
f"in transcript: '{transcript}'"
)
await self.handle_trigger(phrase, transcript)
break
yield event
async def handle_trigger(self, phrase: str, transcript: str):
"""Handle the detected trigger phrase."""
if phrase in ("transfer me", "speak to a human", "real person"):
logger.info("Initiating transfer to human agent")
# In production, trigger your escalation workflow here
elif phrase == "cancel my account":
logger.warning(f"Account cancellation request detected")import { Agent, stt } from "@livekit/agents";
const TRIGGER_PHRASES = [
"transfer me",
"speak to a human",
"real person",
"cancel my account",
];
class KeywordAgent extends Agent {
constructor() {
super({ instructions: "You are a helpful customer service assistant." });
}
async *sttNode(
audio: stt.SpeechStream
): AsyncGenerator<stt.SpeechEvent> {
for await (const event of Agent.default.sttNode(this, audio)) {
if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT) {
const transcript = event.alternatives[0].text.toLowerCase();
for (const phrase of TRIGGER_PHRASES) {
if (transcript.includes(phrase)) {
console.log(
`Trigger phrase detected: '${phrase}' in: '${transcript}'`
);
await this.handleTrigger(phrase, transcript);
break;
}
}
}
yield event;
}
}
private async handleTrigger(phrase: string, transcript: string) {
if (["transfer me", "speak to a human", "real person"].includes(phrase)) {
console.log("Initiating transfer to human agent");
} else if (phrase === "cancel my account") {
console.warn("Account cancellation request detected");
}
}
}Notice that the keyword detection happens inside the async generator loop but before the yield. Your custom logic runs before the event continues down the pipeline. The event is still yielded — keyword detection is additive, not blocking. If you wanted to block the event from reaching the LLM, you would use continue instead of yield.
Building a profanity detector
Now let's build something more substantial: a profanity detector that flags transcripts containing inappropriate language. This agent will either redact the offending content or skip the transcript entirely, preventing it from reaching the LLM.
from livekit.agents import Agent, stt
import typing
import re
import logging
logger = logging.getLogger("safe-agent")
# In production, use a proper content moderation library
BLOCKED_WORDS = {"badword1", "badword2", "offensive_term"}
class ProfanityFilterAgent(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful, family-friendly assistant."
)
self.flagged_count = 0
async def stt_node(
self, audio: stt.SpeechStream
) -> typing.AsyncGenerator[stt.SpeechEvent, None]:
async for event in Agent.default.stt_node(self, audio):
if event.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
transcript = event.alternatives[0].text
result = self.check_profanity(transcript)
if result["blocked"]:
self.flagged_count += 1
logger.warning(
f"Blocked transcript ({self.flagged_count} total): "
f"'{transcript}'"
)
# Skip this event entirely — LLM never sees it
continue
if result["redacted"]:
logger.info(f"Redacted transcript: '{result['text']}'")
# Modify the transcript text before yielding
event.alternatives[0].text = result["text"]
yield event
def check_profanity(self, text: str) -> dict:
"""Check text for profanity. Returns action to take."""
words = text.lower().split()
found = [w for w in words if w in BLOCKED_WORDS]
if not found:
return {"blocked": False, "redacted": False, "text": text}
# If more than half the words are blocked, drop the whole message
if len(found) > len(words) / 2:
return {"blocked": True, "redacted": False, "text": ""}
# Otherwise, redact the offending words
redacted = text
for word in BLOCKED_WORDS:
pattern = re.compile(re.escape(word), re.IGNORECASE)
redacted = pattern.sub("[redacted]", redacted)
return {"blocked": False, "redacted": True, "text": redacted}import { Agent, stt } from "@livekit/agents";
const BLOCKED_WORDS = new Set(["badword1", "badword2", "offensive_term"]);
class ProfanityFilterAgent extends Agent {
private flaggedCount = 0;
constructor() {
super({ instructions: "You are a helpful, family-friendly assistant." });
}
async *sttNode(
audio: stt.SpeechStream
): AsyncGenerator<stt.SpeechEvent> {
for await (const event of Agent.default.sttNode(this, audio)) {
if (event.type === stt.SpeechEventType.FINAL_TRANSCRIPT) {
const transcript = event.alternatives[0].text;
const result = this.checkProfanity(transcript);
if (result.blocked) {
this.flaggedCount++;
console.warn(
`Blocked transcript (${this.flaggedCount} total): '${transcript}'`
);
continue;
}
if (result.redacted) {
console.log(`Redacted transcript: '${result.text}'`);
event.alternatives[0].text = result.text;
}
}
yield event;
}
}
private checkProfanity(text: string): {
blocked: boolean;
redacted: boolean;
text: string;
} {
const words = text.toLowerCase().split(/\s+/);
const found = words.filter((w) => BLOCKED_WORDS.has(w));
if (found.length === 0) {
return { blocked: false, redacted: false, text };
}
if (found.length > words.length / 2) {
return { blocked: true, redacted: false, text: "" };
}
let redacted = text;
for (const word of BLOCKED_WORDS) {
const pattern = new RegExp(word, "gi");
redacted = redacted.replace(pattern, "[redacted]");
}
return { blocked: false, redacted: true, text: redacted };
}
}Intercept final transcripts
The filter only runs on FINAL_TRANSCRIPT events. Interim transcripts are passed through unmodified since they change rapidly and the final version is what matters.
Check for profanity
The check_profanity method examines each word against a blocklist and decides whether to block the entire message, redact specific words, or pass it through unchanged.
Block or redact
If the message is mostly profanity, continue skips the yield and the event disappears from the pipeline — the LLM never sees it. If only some words are problematic, they are replaced with [redacted] and the modified event is yielded.
Track flagged content
The agent maintains a flagged_count counter. In production, you would log this to an analytics system to monitor abuse patterns.
Simple word lists are not enough for production
The blocklist approach shown here is a teaching example. Real-world profanity detection requires handling misspellings, leetspeak, context-dependent words, and multiple languages. Use a dedicated content moderation library or an LLM-based filter (covered in Chapter 6) for production systems.
Test your knowledge
Question 1 of 2
In the stt_node override, what is the effect of using 'continue' instead of 'yield' when processing a speech event?
What you learned
- The
stt_nodeasync generator receives aSpeechStreamand yieldsSpeechEventobjects - You can inspect
FINAL_TRANSCRIPTevents to detect keywords before the LLM processes them - Using
continueinside the generator loop drops events from the pipeline entirely - Modifying
event.alternatives[0].textlets you redact or transform transcripts in-flight
Next up
In the next chapter, you will override llm_node to produce structured JSON output. You will build a chain-of-thought agent that returns its reasoning, its response, and an emotion tag — all parsed from streaming JSON.