Speech-to-Text WebSocket Examples
Complete examples for real-time speech recognition using WebSockets. Perfect for voice agents, live transcription, and interactive applications.
Why WebSockets for STT?
- Real-time: Transcribe as user speaks
- Immediate Feedback: Get interim results instantly
- Interactive: Build conversational interfaces
- Efficient: Stream audio directly from microphone
Quick Start
Basic WebSocket Transcription
Codeconst ws = new WebSocket("wss://api.slng.ai/v1/stt/deepgram/nova:2"); ws.onopen = () => { // 1. Initialize session ws.send( JSON.stringify({ type: "init", config: { language: "en", sample_rate: 16000, encoding: "linear16", }, }), ); // 2. Send audio data (from microphone or file) // ws.send(audioBuffer); // Binary data }; ws.onmessage = (event) => { const message = JSON.parse(event.data); if (message.type === "transcript") { console.log("Transcript:", message.text); console.log("Is final:", message.is_final); console.log("Confidence:", message.confidence); if (message.is_final) { // This is the final transcription for this segment displayFinalTranscript(message.text); } else { // This is an interim result (may change) showInterimTranscript(message.text); } } };
Complete Examples
JavaScript/TypeScript - Microphone Streaming
Full Implementation
Codeclass RealtimeSTT { private ws: WebSocket; private mediaStream: MediaStream | null = null; private audioContext: AudioContext | null = null; private processor: ScriptProcessorNode | null = null; private interimTranscript: string = ""; private finalTranscript: string = ""; constructor(provider: string, model: string, variant: string) { const url = `wss://api.slng.ai/v1/stt/${provider}/${model}:${variant}`; this.ws = new WebSocket(url); this.ws.onopen = () => this.handleOpen(); this.ws.onmessage = (event) => this.handleMessage(event); this.ws.onerror = (error) => this.handleError(error); this.ws.onclose = () => this.handleClose(); } private handleOpen() { console.log("WebSocket connected"); // Initialize session this.send({ type: "init", config: { language: "en", sample_rate: 16000, encoding: "linear16", }, }); } private handleMessage(event: MessageEvent) { const message = JSON.parse(event.data); switch (message.type) { case "ready": console.log("Session ready:", message.session_id); break; case "transcript": if (message.is_final) { // Final transcription this.finalTranscript += message.text + " "; this.interimTranscript = ""; this.onFinalTranscript(message.text, message); } else { // Interim transcription this.interimTranscript = message.text; this.onInterimTranscript(message.text, message); } break; case "error": console.error("STT Error:", message.code, message.message); break; } } private handleError(error: Event) { console.error("WebSocket error:", error); } private handleClose() { console.log("WebSocket closed"); this.stopMicrophone(); } async startMicrophone() { try { // Request microphone access this.mediaStream = await navigator.mediaDevices.getUserMedia({ audio: { sampleRate: 16000, channelCount: 1, echoCancellation: true, noiseSuppression: true, }, }); // Create audio context this.audioContext = new AudioContext({ sampleRate: 16000 }); const source = this.audioContext.createMediaStreamSource( this.mediaStream, ); // Create processor to capture audio this.processor = this.audioContext.createScriptProcessor(4096, 1, 1); this.processor.onaudioprocess = (e) => { if (this.ws.readyState === WebSocket.OPEN) { // Get audio data const inputData = e.inputBuffer.getChannelData(0); // Convert Float32Array to Int16Array (PCM) const pcmData = this.float32ToInt16(inputData); // Send to WebSocket as binary this.ws.send(pcmData.buffer); } }; source.connect(this.processor); this.processor.connect(this.audioContext.destination); console.log("Microphone started"); } catch (error) { console.error("Microphone error:", error); throw error; } } stopMicrophone() { if (this.processor) { this.processor.disconnect(); this.processor = null; } if (this.mediaStream) { this.mediaStream.getTracks().forEach((track) => track.stop()); this.mediaStream = null; } if (this.audioContext) { this.audioContext.close(); this.audioContext = null; } console.log("Microphone stopped"); } finalize() { this.send({ type: "finalize" }); } close() { this.stopMicrophone(); this.ws.close(); } // Callbacks - override these onFinalTranscript(text: string, data: any) { console.log("Final:", text); } onInterimTranscript(text: string, data: any) { console.log("Interim:", text); } // Utility methods private send(message: any) { if (this.ws.readyState === WebSocket.OPEN) { this.ws.send(JSON.stringify(message)); } } private float32ToInt16(buffer: Float32Array): Int16Array { const int16 = new Int16Array(buffer.length); for (let i = 0; i < buffer.length; i++) { const s = Math.max(-1, Math.min(1, buffer[i])); int16[i] = s < 0 ? s * 0x8000 : s * 0x7fff; } return int16; } getFinalTranscript(): string { return this.finalTranscript.trim(); } getInterimTranscript(): string { return this.interimTranscript; } } // Usage const stt = new RealtimeSTT("deepgram", "nova", "2"); // Override callbacks stt.onFinalTranscript = (text, data) => { document.getElementById("final").textContent += text + " "; console.log("Confidence:", data.confidence); }; stt.onInterimTranscript = (text) => { document.getElementById("interim").textContent = text; }; // Start transcribing const startBtn = document.getElementById("start"); startBtn.addEventListener("click", async () => { await stt.startMicrophone(); startBtn.disabled = true; }); // Stop transcribing const stopBtn = document.getElementById("stop"); stopBtn.addEventListener("click", () => { stt.finalize(); stt.close(); stopBtn.disabled = true; });
Python - Microphone Streaming
Complete Implementation
Codeimport websocket import json import pyaudio import threading class RealtimeSTT: def __init__(self, provider: str, model: str, variant: str, api_key: str): self.url = f"wss://api.slng.ai/v1/stt/{provider}/{model}:{variant}" self.api_key = api_key self.ws = None self.audio = None self.stream = None self.is_running = False # Transcription results self.final_transcript = [] self.interim_transcript = "" def on_open(self, ws): print("WebSocket connected") # Initialize session ws.send(json.dumps({ "type": "init", "config": { "language": "en", "sample_rate": 16000, "encoding": "linear16" } })) # Start microphone in background thread threading.Thread(target=self.stream_microphone, daemon=True).start() def on_message(self, ws, message): data = json.loads(message) if data["type"] == "ready": print(f"Session ready: {data['session_id']}") elif data["type"] == "transcript": if data["is_final"]: # Final transcription text = data["text"] self.final_transcript.append(text) self.interim_transcript = "" self.on_final_transcript(text, data) else: # Interim transcription self.interim_transcript = data["text"] self.on_interim_transcript(data["text"], data) elif data["type"] == "error": print(f"STT Error: {data['code']} - {data['message']}") def on_error(self, ws, error): print(f"WebSocket error: {error}") def on_close(self, ws, close_status_code, close_msg): print("WebSocket closed") self.stop_microphone() def stream_microphone(self): """Stream microphone audio to WebSocket.""" self.audio = pyaudio.PyAudio() self.stream = self.audio.open( format=pyaudio.paInt16, channels=1, rate=16000, input=True, frames_per_buffer=1024 ) print("Microphone started") self.is_running = True try: while self.is_running and self.ws and self.ws.sock and self.ws.sock.connected: # Read audio chunk audio_data = self.stream.read(1024, exception_on_overflow=False) # Send to WebSocket if self.ws and self.ws.sock: self.ws.send(audio_data, opcode=websocket.ABNF.OPCODE_BINARY) except Exception as e: print(f"Microphone error: {e}") finally: self.stop_microphone() def stop_microphone(self): """Stop microphone streaming.""" self.is_running = False if self.stream: self.stream.stop_stream() self.stream.close() self.stream = None if self.audio: self.audio.terminate() self.audio = None print("Microphone stopped") def connect(self): """Connect to WebSocket and start transcription.""" self.ws = websocket.WebSocketApp( self.url, on_open=self.on_open, on_message=self.on_message, on_error=self.on_error, on_close=self.on_close ) # Run WebSocket in background thread threading.Thread(target=self.ws.run_forever, daemon=True).start() def finalize(self): """Finalize transcription.""" if self.ws and self.ws.sock: self.ws.send(json.dumps({"type": "finalize"})) def close(self): """Close connection and stop microphone.""" self.stop_microphone() if self.ws: self.ws.close() def get_final_transcript(self) -> str: """Get complete final transcript.""" return " ".join(self.final_transcript) def get_interim_transcript(self) -> str: """Get current interim transcript.""" return self.interim_transcript # Override these methods def on_final_transcript(self, text: str, data: dict): """Called when final transcript is received.""" print(f"Final: {text}") print(f"Confidence: {data.get('confidence', 'N/A')}") def on_interim_transcript(self, text: str, data: dict): """Called when interim transcript is received.""" print(f"Interim: {text}", end='\r') # Usage import time stt = RealtimeSTT('deepgram', 'nova', '2', 'YOUR_API_KEY') # Override callbacks def handle_final(text, data): print(f"\n✓ {text}") print(f" Confidence: {data.get('confidence', 0):.2%}") def handle_interim(text, data): print(f"... {text}", end='\r') stt.on_final_transcript = handle_final stt.on_interim_transcript = handle_interim # Start transcribing print("Starting transcription... (speak into your microphone)") stt.connect() # Let it run for 30 seconds time.sleep(30) # Finalize and close stt.finalize() time.sleep(1) stt.close() # Get full transcript print("\nFull transcript:") print(stt.get_final_transcript())
Voice Agent Pattern
Complete voice agent with STT + LLM + TTS:
Codeclass VoiceAgent { constructor() { this.stt = new WebSocket("wss://api.slng.ai/v1/stt/deepgram/nova:2"); this.tts = new WebSocket("wss://api.slng.ai/v1/tts/deepgram/aura:2"); this.isListening = true; this.isAgentSpeaking = false; this.initializeSTT(); this.initializeTTS(); } initializeSTT() { this.stt.onopen = () => { this.stt.send( JSON.stringify({ type: "init", config: { language: "en", sample_rate: 16000, encoding: "linear16", }, }), ); // Start microphone this.startMicrophone(); }; this.stt.onmessage = (event) => { const msg = JSON.parse(event.data); if (msg.type === "transcript" && msg.is_final) { // User spoke - handle input this.handleUserInput(msg.text); } }; } initializeTTS() { this.tts.onopen = () => { this.tts.send( JSON.stringify({ type: "init", config: { encoding: "linear16", sample_rate: 24000, }, }), ); }; this.tts.onmessage = (event) => { if (event.data instanceof ArrayBuffer) { this.playAudio(event.data); } else { const msg = JSON.parse(event.data); if (msg.type === "flushed") { this.isAgentSpeaking = false; } } }; } async handleUserInput(userText) { console.log("User said:", userText); // If agent is speaking, interrupt if (this.isAgentSpeaking) { this.interruptAgent(); } // Get response from LLM const response = await this.generateResponse(userText); // Speak response this.speak(response); } interruptAgent() { // Cancel TTS this.tts.send(JSON.stringify({ type: "cancel" })); this.clearAudioBuffer(); this.isAgentSpeaking = false; } speak(text) { this.isAgentSpeaking = true; this.tts.send( JSON.stringify({ type: "speak", text: text, }), ); this.tts.send( JSON.stringify({ type: "flush", }), ); } async generateResponse(userText) { // Call your LLM API here // Example with OpenAI: const response = await fetch("https://api.openai.com/v1/chat/completions", { method: "POST", headers: { Authorization: "Bearer YOUR_OPENAI_KEY", "Content-Type": "application/json", }, body: JSON.stringify({ model: "gpt-4", messages: [ { role: "system", content: "You are a helpful voice assistant." }, { role: "user", content: userText }, ], }), }); const data = await response.json(); return data.choices[0].message.content; } startMicrophone() { // Your microphone streaming code here } playAudio(audioData) { // Your audio playback code here } clearAudioBuffer() { // Clear audio queue } } // Usage const agent = new VoiceAgent(); console.log("Voice agent ready! Start speaking...");
Advanced Features
Voice Activity Detection (VAD)
Automatically detect when user starts/stops speaking:
Codeclass VADEnabledSTT extends RealtimeSTT { private isSpeaking: boolean = false; private silenceTimeout: number | null = null; private readonly SILENCE_DURATION = 1000; // 1 second onInterimTranscript(text: string, data: any) { if (text.trim().length > 0) { if (!this.isSpeaking) { this.isSpeaking = true; this.onSpeechStart(); } // Reset silence timeout if (this.silenceTimeout) { clearTimeout(this.silenceTimeout); } this.silenceTimeout = setTimeout(() => { this.isSpeaking = false; this.onSpeechEnd(); }, this.SILENCE_DURATION); } } onSpeechStart() { console.log('User started speaking'); // Interrupt agent if needed } onSpeechEnd() { console.log('User stopped speaking'); // Process complete utterance } }
Keyword Spotting
React to specific keywords in real-time:
Codeclass KeywordSpottingSTT extends RealtimeSTT { private keywords: Set<string>; constructor(provider, model, variant, keywords: string[]) { super(provider, model, variant); this.keywords = new Set(keywords.map(k => k.toLowerCase())); } onInterimTranscript(text: string, data: any) { // Check for keywords in interim results const words = text.toLowerCase().split(' '); for (const word of words) { if (this.keywords.has(word)) { this.onKeywordDetected(word, text); } } } onKeywordDetected(keyword: string, fullText: string) { console.log(`Keyword detected: "${keyword}" in "${fullText}"`); // Trigger action } } // Usage const stt = new KeywordSpottingSTT('deepgram', 'nova', '2', [ 'help', 'assistant', 'cancel', 'stop' ]);
Live Caption Display
Display transcriptions like YouTube live captions:
Codeclass LiveCaptionSTT extends RealtimeSTT { private captionElement: HTMLElement; private maxLines: number = 3; private lines: string[] = []; constructor(provider, model, variant, captionElementId: string) { super(provider, model, variant); this.captionElement = document.getElementById(captionElementId)!; } onFinalTranscript(text: string, data: any) { this.lines.push(text); // Keep only last N lines if (this.lines.length > this.maxLines) { this.lines.shift(); } this.updateDisplay(); } onInterimTranscript(text: string, data: any) { // Show interim text below final lines const interim = `<span class="interim">${text}</span>`; this.updateDisplay(interim); } updateDisplay(interimText: string = '') { const finalLines = this.lines.map(line => `<div class="caption-line">${line}</div>` ).join(''); const interim = interimText ? `<div class="caption-line interim">${interimText}</div>` : ''; this.captionElement.innerHTML = finalLines + interim; } } // CSS /* .caption-line { background: rgba(0, 0, 0, 0.8); color: white; padding: 8px 16px; margin: 4px 0; border-radius: 4px; font-size: 18px; } .interim { opacity: 0.6; font-style: italic; } */
Model Comparison
Deepgram Nova (Recommended for Real-time)
Best for: Voice agents, live transcription, real-time applications
Codeconst ws = new WebSocket("wss://api.slng.ai/v1/stt/deepgram/nova:2");
Features:
- Lowest latency (sub-300ms)
- Interim results
- High accuracy for conversational speech
- Word-level timestamps
- Confidence scores
- Automatic punctuation
SLNG Whisper (High Accuracy)
Best for: Recorded audio, highest accuracy needs
Codeconst ws = new WebSocket( "wss://api.slng.ai/v1/stt/slng/openai/whisper:large-v3", );
Features:
- Highest accuracy
- Multi-language support (100+)
- Robust noise handling
- Segment timestamps
- Good for accents
Troubleshooting
Issue: No transcription received
Solutions:
- Check audio format (16kHz, mono, PCM16)
- Verify microphone permissions
- Ensure audio data is being sent
- Check WebSocket connection state
Issue: High latency
Solutions:
- Use Deepgram Nova for lowest latency
- Reduce audio chunk size
- Check network connection
- Verify server region proximity
Issue: Poor accuracy
Solutions:
- Ensure clean audio input
- Use noise suppression
- Specify correct language
- Check microphone quality
- Reduce background noise
Last modified on