Add Deepgram MCP Server - speech-to-text and TTS

Python FastMCP server wrapping Deepgram API for audio transcription
and text-to-speech. Supports 125+ multilingual voices, large file
chunking via FFmpeg, formatted markdown output with speaker
diarization, and Docker deployment on port 8009.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Alejandro Gutiérrez
2026-02-18 15:17:52 +01:00
parent ea5775da25
commit 0ba2896565
13 changed files with 1583 additions and 0 deletions

View File

@@ -0,0 +1 @@
DEEPGRAM_API_KEY=your_api_key_here

21
deepgram-mcp/Dockerfile Normal file
View File

@@ -0,0 +1,21 @@
FROM python:3.11-slim
RUN apt-get update && \
apt-get install -y --no-install-recommends ffmpeg curl && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ src/
ENV PYTHONPATH=/app/src
EXPOSE 8009
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD curl -f http://localhost:8009/health || exit 1
CMD ["python", "-m", "deepgram_mcp.server"]

View File

@@ -0,0 +1,21 @@
services:
deepgram-mcp:
build: .
container_name: deepgram-mcp
restart: unless-stopped
ports:
- "8009:8009"
volumes:
- deepgram-uploads:/data/uploads
- deepgram-tts:/data/tts_output
env_file:
- .env
environment:
- UPLOAD_DIR=/data/uploads
- TTS_DIR=/data/tts_output
- HOST=0.0.0.0
- PORT=8009
volumes:
deepgram-uploads:
deepgram-tts:

View File

@@ -0,0 +1,7 @@
fastmcp>=2.0.0
httpx
aiofiles
python-dotenv
python-multipart
starlette
uvicorn

View File

@@ -0,0 +1 @@
# Deepgram MCP Server

View File

@@ -0,0 +1,101 @@
"""File upload, download, and listing management for Deepgram MCP server."""
import os
import re
from datetime import datetime, timezone
from pathlib import Path
import aiofiles
UPLOAD_DIR = Path(os.getenv("UPLOAD_DIR", "/data/uploads"))
TTS_DIR = Path(os.getenv("TTS_DIR", "/data/tts_output"))
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
TTS_DIR.mkdir(parents=True, exist_ok=True)
def _sanitize_filename(filename: str) -> str:
"""Strip path components and dangerous characters from a filename."""
# Take only the basename (no directory traversal)
name = Path(filename).name
# Remove any remaining path separators or null bytes
name = re.sub(r'[/\\:\x00]', '', name)
# Collapse whitespace
name = re.sub(r'\s+', '_', name.strip())
if not name:
name = "unnamed_file"
return name
def _timestamp_prefix() -> str:
"""Generate a timestamp prefix for collision avoidance."""
return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
async def save_upload(filename: str, content: bytes) -> dict:
"""Save uploaded file content with a timestamp prefix to avoid collisions.
Returns dict with filename, path, and size_mb.
"""
safe_name = _sanitize_filename(filename)
prefixed_name = f"{_timestamp_prefix()}_{safe_name}"
dest = UPLOAD_DIR / prefixed_name
async with aiofiles.open(dest, "wb") as f:
await f.write(content)
size_mb = round(dest.stat().st_size / (1024 * 1024), 2)
return {
"filename": prefixed_name,
"path": str(dest),
"size_mb": size_mb,
}
def list_files(directory: Path) -> list[dict]:
"""List files in a directory with name, size_mb, and modified date."""
if not directory.is_dir():
return []
files = []
for entry in sorted(directory.iterdir()):
if entry.is_file():
stat = entry.stat()
files.append({
"name": entry.name,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
})
return files
def delete_file(directory: Path, filename: str) -> bool:
"""Delete a file from the given directory. Returns True on success."""
safe_name = _sanitize_filename(filename)
target = directory / safe_name
# Ensure the resolved path is still within the directory
try:
target.resolve().relative_to(directory.resolve())
except ValueError:
return False
if target.is_file():
target.unlink()
return True
return False
def get_file_path(directory: Path, filename: str) -> Path | None:
"""Return the full path if the file exists in the directory, else None."""
safe_name = _sanitize_filename(filename)
target = directory / safe_name
try:
target.resolve().relative_to(directory.resolve())
except ValueError:
return None
return target if target.is_file() else None

View File

@@ -0,0 +1,332 @@
"""Format Deepgram JSON responses into readable markdown."""
from __future__ import annotations
def format_timestamp(seconds: float) -> str:
"""Format seconds into H:MM:SS or M:SS."""
total = int(seconds)
h, remainder = divmod(total, 3600)
m, s = divmod(remainder, 60)
if h > 0:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
def format_duration(seconds: float) -> str:
"""Format seconds into human-readable duration like '5m 32s' or '1h 5m 32s'."""
total = int(seconds)
h, remainder = divmod(total, 3600)
m, s = divmod(remainder, 60)
parts: list[str] = []
if h > 0:
parts.append(f"{h}h")
if m > 0 or h > 0:
parts.append(f"{m}m")
parts.append(f"{s}s")
return " ".join(parts)
def truncate_result(text: str, max_chars: int = 80000) -> tuple[str, bool]:
"""Truncate text at last newline before limit if too long."""
if len(text) <= max_chars:
return text, False
truncated = text[:max_chars]
last_newline = truncated.rfind("\n")
if last_newline > 0:
truncated = truncated[:last_newline]
truncated += "\n\n---\n*[Truncated - full transcript saved to file]*"
return truncated, True
def format_transcription(response: dict, include_timestamps: bool = True) -> str:
"""Format a Deepgram transcription response into readable markdown.
Args:
response: Raw Deepgram JSON response dict.
include_timestamps: Whether to include timestamps in transcript output.
Returns:
Formatted markdown string.
"""
sections: list[str] = []
metadata = response.get("metadata") or {}
results = response.get("results") or {}
channels = results.get("channels") or []
first_alt = {}
if channels:
alts = channels[0].get("alternatives") or []
if alts:
first_alt = alts[0]
# --- Metadata header ---
section = _format_metadata(metadata, first_alt)
if section:
sections.append(section)
# --- Transcript ---
utterances = results.get("utterances")
section = _format_transcript(first_alt, utterances, include_timestamps)
if section:
sections.append(section)
# --- Summary ---
section = _format_summaries(first_alt)
if section:
sections.append(section)
# --- Topics ---
section = _format_topics(first_alt)
if section:
sections.append(section)
# --- Entities ---
section = _format_entities(first_alt)
if section:
sections.append(section)
# --- Sentiment ---
section = _format_sentiment(first_alt)
if section:
sections.append(section)
# --- Intents ---
section = _format_intents(first_alt)
if section:
sections.append(section)
# --- Search Results ---
section = _format_search(first_alt)
if section:
sections.append(section)
return "\n\n".join(sections)
def _format_metadata(metadata: dict, first_alt: dict) -> str:
"""Build the metadata header section."""
lines = ["## Transcription Results"]
duration = metadata.get("duration")
if duration is not None:
lines.append(f"- **Duration:** {format_duration(duration)}")
model_info = metadata.get("model_info")
if model_info and isinstance(model_info, dict):
for info in model_info.values():
name = info.get("name") if isinstance(info, dict) else None
if name:
lines.append(f"- **Model:** {name}")
break
confidence = first_alt.get("confidence")
if confidence is not None:
lines.append(f"- **Confidence:** {confidence * 100:.1f}%")
num_channels = metadata.get("channels")
if num_channels is not None:
lines.append(f"- **Channels:** {num_channels}")
return "\n".join(lines)
def _format_transcript(
first_alt: dict,
utterances: list[dict] | None,
include_timestamps: bool,
) -> str:
"""Build the transcript section using utterances, paragraphs, or plain text."""
# Prefer utterances (diarized output)
if utterances:
lines = ["### Transcript", ""]
for utt in utterances:
speaker = utt.get("speaker", "?")
text = utt.get("transcript", "").strip()
if include_timestamps:
start = format_timestamp(utt.get("start", 0))
end = format_timestamp(utt.get("end", 0))
lines.append(f"**Speaker {speaker}** ({start} - {end}): {text}")
else:
lines.append(f"**Speaker {speaker}**: {text}")
lines.append("")
return "\n".join(lines).rstrip()
# Fall back to paragraphs
paragraphs_data = first_alt.get("paragraphs")
if paragraphs_data and isinstance(paragraphs_data, dict):
paras = paragraphs_data.get("paragraphs") or []
if paras:
lines = ["### Transcript", ""]
for para in paras:
speaker = para.get("speaker")
sentences = para.get("sentences") or []
text = " ".join(s.get("text", "") for s in sentences).strip()
if not text:
continue
if speaker is not None and include_timestamps:
start = format_timestamp(para.get("start", 0))
end = format_timestamp(para.get("end", 0))
lines.append(
f"**Speaker {speaker}** ({start} - {end}): {text}"
)
elif speaker is not None:
lines.append(f"**Speaker {speaker}**: {text}")
else:
lines.append(text)
lines.append("")
return "\n".join(lines).rstrip()
# Fall back to plain transcript
transcript = first_alt.get("transcript", "").strip()
if transcript:
return f"### Transcript\n\n{transcript}"
return ""
def _format_summaries(first_alt: dict) -> str:
"""Build the summary section."""
summaries = first_alt.get("summaries")
if not summaries:
return ""
texts = [s.get("summary", "") for s in summaries if s.get("summary")]
if not texts:
return ""
return "### Summary\n\n" + "\n\n".join(texts)
def _format_topics(first_alt: dict) -> str:
"""Build the topics section."""
topics_data = first_alt.get("topics")
if not topics_data or not isinstance(topics_data, dict):
return ""
segments = topics_data.get("segments") or []
# Collect unique topics with their highest confidence
seen: dict[str, float] = {}
for seg in segments:
for t in seg.get("topics") or []:
topic = t.get("topic", "")
conf = t.get("confidence", 0)
if topic and (topic not in seen or conf > seen[topic]):
seen[topic] = conf
if not seen:
return ""
lines = ["### Topics"]
for topic, conf in sorted(seen.items(), key=lambda x: x[1], reverse=True):
lines.append(f"- **{topic}** ({conf * 100:.1f}%)")
return "\n".join(lines)
def _format_entities(first_alt: dict) -> str:
"""Build the entities table."""
entities_data = first_alt.get("entities")
if not entities_data or not isinstance(entities_data, dict):
return ""
segments = entities_data.get("segments") or []
rows: list[tuple[str, str, float]] = []
for seg in segments:
for ent in seg.get("entities") or []:
label = ent.get("label", "")
value = ent.get("value", "")
conf = ent.get("confidence", 0)
if label and value:
rows.append((label, value, conf))
if not rows:
return ""
lines = [
"### Entities",
"",
"| Type | Value | Confidence |",
"|------|-------|------------|",
]
for label, value, conf in rows:
lines.append(f"| {label} | {value} | {conf * 100:.1f}% |")
return "\n".join(lines)
def _format_sentiment(first_alt: dict) -> str:
"""Build the sentiment section."""
sentiments_data = first_alt.get("sentiments")
if not sentiments_data or not isinstance(sentiments_data, dict):
return ""
lines = ["### Sentiment"]
average = sentiments_data.get("average")
if average and isinstance(average, dict):
sentiment = average.get("sentiment", "")
score = average.get("sentiment_score")
if sentiment and score is not None:
lines.append(f"\n**Overall:** {sentiment.capitalize()} ({score:.2f})")
segments = sentiments_data.get("segments") or []
if segments:
lines.append("")
lines.append("| Segment | Sentiment | Score |")
lines.append("|---------|-----------|-------|")
for seg in segments:
text = seg.get("text", "").strip()
sentiment = seg.get("sentiment", "")
score = seg.get("sentiment_score")
if text and sentiment and score is not None:
# Truncate long segment text for table readability
display = text if len(text) <= 60 else text[:57] + "..."
lines.append(
f'| "{display}" | {sentiment.capitalize()} | {score:.2f} |'
)
if len(lines) <= 1:
return ""
return "\n".join(lines)
def _format_intents(first_alt: dict) -> str:
"""Build the intents section."""
intents_data = first_alt.get("intents")
if not intents_data or not isinstance(intents_data, dict):
return ""
segments = intents_data.get("segments") or []
# Collect unique intents with highest confidence
seen: dict[str, float] = {}
for seg in segments:
for intent in seg.get("intents") or []:
name = intent.get("intent", "")
conf = intent.get("confidence", 0)
if name and (name not in seen or conf > seen[name]):
seen[name] = conf
if not seen:
return ""
lines = ["### Intents"]
for name, conf in sorted(seen.items(), key=lambda x: x[1], reverse=True):
lines.append(f"- **{name}** ({conf * 100:.1f}%)")
return "\n".join(lines)
def _format_search(first_alt: dict) -> str:
"""Build the search results section with timestamps."""
search_data = first_alt.get("search")
if not search_data:
return ""
lines = ["### Search Results"]
for group in search_data:
query = group.get("query", "")
hits = group.get("hits") or []
lines.append(f"\n**\"{query}\"**")
if not hits:
lines.append("No matches found.")
continue
for hit in hits:
snippet = hit.get("snippet", "")
start = hit.get("start", 0)
end = hit.get("end", 0)
conf = hit.get("confidence", 0)
lines.append(
f"- ({format_timestamp(start)} - {format_timestamp(end)}) "
f"*{snippet}* ({conf * 100:.1f}%)"
)
if len(lines) <= 1:
return ""
return "\n".join(lines)

View File

@@ -0,0 +1,461 @@
"""Deepgram MCP Server — FastMCP 2.x with custom HTTP routes."""
import asyncio
import os
from pathlib import Path
import aiofiles
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.requests import Request
from starlette.responses import FileResponse, JSONResponse, Response
from deepgram_mcp import file_manager, formatter, transcription, tts
load_dotenv()
mcp = FastMCP("Deepgram MCP")
# ---------------------------------------------------------------------------
# Shared transcription parameter docstring
# ---------------------------------------------------------------------------
_TRANSCRIBE_PARAMS_DOC = """
Parameters:
model: Deepgram model (nova-3, nova-2, enhanced, base, whisper-large). Default: nova-3
language: BCP-47 language code (e.g. en, es, fr). Omit for auto-detect.
detect_language: Auto-detect language (bool).
smart_format: Enable smart formatting (bool, default True).
punctuate: Add punctuation (bool).
paragraphs: Split into paragraphs (bool).
numerals: Convert numbers to digits (bool).
measurements: Format measurements (bool).
dictation: Dictation mode with spoken punctuation (bool).
diarize: Speaker diarization (bool, default True).
utterances: Return utterances (bool).
utt_split: Pause threshold in seconds for utterance splitting (float).
summarize: Generate summary (bool).
topics: Detect topics (bool).
sentiment: Analyze sentiment (bool).
entities: Detect entities (bool).
intents: Detect intents (bool).
custom_topics: Comma-separated custom topics (up to 100).
custom_intents: Comma-separated custom intents.
keywords: Comma-separated "term:boost" pairs for keyword boosting.
keyterm: Prompting term for Nova-3.
search: Comma-separated terms to search for in audio.
redact: Comma-separated redaction types (pci, pii, numbers).
profanity_filter: Filter profanity (bool).
replace: Comma-separated "find:replace" pairs.
filler_words: Transcribe filler words like um, uh (bool).
multichannel: Treat each channel independently (bool).
encoding: Audio encoding (linear16, flac, mulaw, opus, etc.).
sample_rate: Audio sample rate in Hz.
"""
def _collect_options(**kwargs) -> dict:
"""Filter out None values from tool kwargs to build options dict."""
return {k: v for k, v in kwargs.items() if v is not None}
async def _do_transcribe(source, **kwargs) -> str:
"""Run transcription, format result, handle truncation."""
options = _collect_options(**kwargs)
result = await transcription.transcribe(source, options)
text = formatter.format_transcription(result)
text, was_truncated = formatter.truncate_result(text)
if was_truncated:
# Save full transcript to file
full_text = formatter.format_transcription(result)
save_path = file_manager.TTS_DIR / "full_transcript.md"
async with aiofiles.open(save_path, "w") as f:
await f.write(full_text)
text += f"\n\nFull transcript saved to: {save_path}"
return text
# ---------------------------------------------------------------------------
# Transcription tools
# ---------------------------------------------------------------------------
@mcp.tool(description="Transcribe audio from a file path on the NUC server." + _TRANSCRIBE_PARAMS_DOC)
async def transcribe_file(
path: str,
model: str = "nova-3",
language: str | None = None,
detect_language: bool | None = None,
smart_format: bool = True,
punctuate: bool | None = None,
paragraphs: bool | None = None,
numerals: bool | None = None,
measurements: bool | None = None,
dictation: bool | None = None,
diarize: bool = True,
utterances: bool | None = None,
utt_split: float | None = None,
summarize: bool | None = None,
topics: bool | None = None,
sentiment: bool | None = None,
entities: bool | None = None,
intents: bool | None = None,
custom_topics: str | None = None,
custom_intents: str | None = None,
keywords: str | None = None,
keyterm: str | None = None,
search: str | None = None,
redact: str | None = None,
profanity_filter: bool | None = None,
replace: str | None = None,
filler_words: bool | None = None,
multichannel: bool | None = None,
encoding: str | None = None,
sample_rate: int | None = None,
) -> str:
"""Transcribe an audio file from a filesystem path on the NUC."""
file_path = Path(path)
if not file_path.is_file():
return f"Error: File not found: {path}"
return await _do_transcribe(
file_path,
model=model, language=language, detect_language=detect_language,
smart_format=smart_format, punctuate=punctuate, paragraphs=paragraphs,
numerals=numerals, measurements=measurements, dictation=dictation,
diarize=diarize, utterances=utterances, utt_split=utt_split,
summarize=summarize, topics=topics, sentiment=sentiment,
entities=entities, intents=intents,
custom_topics=custom_topics, custom_intents=custom_intents,
keywords=keywords, keyterm=keyterm, search=search,
redact=redact, profanity_filter=profanity_filter, replace=replace,
filler_words=filler_words, multichannel=multichannel,
encoding=encoding, sample_rate=sample_rate,
)
@mcp.tool(description="Transcribe audio from a public URL." + _TRANSCRIBE_PARAMS_DOC)
async def transcribe_url(
url: str,
model: str = "nova-3",
language: str | None = None,
detect_language: bool | None = None,
smart_format: bool = True,
punctuate: bool | None = None,
paragraphs: bool | None = None,
numerals: bool | None = None,
measurements: bool | None = None,
dictation: bool | None = None,
diarize: bool = True,
utterances: bool | None = None,
utt_split: float | None = None,
summarize: bool | None = None,
topics: bool | None = None,
sentiment: bool | None = None,
entities: bool | None = None,
intents: bool | None = None,
custom_topics: str | None = None,
custom_intents: str | None = None,
keywords: str | None = None,
keyterm: str | None = None,
search: str | None = None,
redact: str | None = None,
profanity_filter: bool | None = None,
replace: str | None = None,
filler_words: bool | None = None,
multichannel: bool | None = None,
encoding: str | None = None,
sample_rate: int | None = None,
) -> str:
"""Transcribe audio from a publicly accessible URL."""
if not url.startswith(("http://", "https://")):
return "Error: URL must start with http:// or https://"
return await _do_transcribe(
url,
model=model, language=language, detect_language=detect_language,
smart_format=smart_format, punctuate=punctuate, paragraphs=paragraphs,
numerals=numerals, measurements=measurements, dictation=dictation,
diarize=diarize, utterances=utterances, utt_split=utt_split,
summarize=summarize, topics=topics, sentiment=sentiment,
entities=entities, intents=intents,
custom_topics=custom_topics, custom_intents=custom_intents,
keywords=keywords, keyterm=keyterm, search=search,
redact=redact, profanity_filter=profanity_filter, replace=replace,
filler_words=filler_words, multichannel=multichannel,
encoding=encoding, sample_rate=sample_rate,
)
@mcp.tool(description="Transcribe a previously uploaded audio file." + _TRANSCRIBE_PARAMS_DOC)
async def transcribe_uploaded(
filename: str,
model: str = "nova-3",
language: str | None = None,
detect_language: bool | None = None,
smart_format: bool = True,
punctuate: bool | None = None,
paragraphs: bool | None = None,
numerals: bool | None = None,
measurements: bool | None = None,
dictation: bool | None = None,
diarize: bool = True,
utterances: bool | None = None,
utt_split: float | None = None,
summarize: bool | None = None,
topics: bool | None = None,
sentiment: bool | None = None,
entities: bool | None = None,
intents: bool | None = None,
custom_topics: str | None = None,
custom_intents: str | None = None,
keywords: str | None = None,
keyterm: str | None = None,
search: str | None = None,
redact: str | None = None,
profanity_filter: bool | None = None,
replace: str | None = None,
filler_words: bool | None = None,
multichannel: bool | None = None,
encoding: str | None = None,
sample_rate: int | None = None,
) -> str:
"""Transcribe a file that was uploaded via the /upload endpoint."""
file_path = file_manager.get_file_path(file_manager.UPLOAD_DIR, filename)
if file_path is None:
return f"Error: Uploaded file not found: {filename}"
return await _do_transcribe(
file_path,
model=model, language=language, detect_language=detect_language,
smart_format=smart_format, punctuate=punctuate, paragraphs=paragraphs,
numerals=numerals, measurements=measurements, dictation=dictation,
diarize=diarize, utterances=utterances, utt_split=utt_split,
summarize=summarize, topics=topics, sentiment=sentiment,
entities=entities, intents=intents,
custom_topics=custom_topics, custom_intents=custom_intents,
keywords=keywords, keyterm=keyterm, search=search,
redact=redact, profanity_filter=profanity_filter, replace=replace,
filler_words=filler_words, multichannel=multichannel,
encoding=encoding, sample_rate=sample_rate,
)
# ---------------------------------------------------------------------------
# TTS tools
# ---------------------------------------------------------------------------
@mcp.tool(description="Convert text to speech using Deepgram Aura-2 voices. Returns download URL for the generated audio file.")
async def text_to_speech(
text: str,
model: str = "aura-2-asteria-en",
encoding: str = "mp3",
sample_rate: int = 24000,
container: str | None = None,
) -> str:
"""Generate speech audio from text."""
audio_bytes, filename = await tts.text_to_speech(
text, model=model, encoding=encoding,
sample_rate=sample_rate, container=container,
)
save_path = file_manager.TTS_DIR / filename
async with aiofiles.open(save_path, "wb") as f:
await f.write(audio_bytes)
size_mb = round(len(audio_bytes) / (1024 * 1024), 2)
host = os.getenv("HOST", "0.0.0.0")
port = os.getenv("PORT", "8009")
download_url = f"http://192.168.1.3:{port}/files/{filename}"
return (
f"Audio generated successfully.\n"
f"- **File:** {filename}\n"
f"- **Size:** {size_mb} MB\n"
f"- **Model:** {model}\n"
f"- **Encoding:** {encoding}\n"
f"- **Download:** {download_url}"
)
@mcp.tool(description="List available Deepgram Aura-2 TTS voices. Optionally filter by language code (en, es, de, fr, nl, it, ja).")
async def list_tts_voices(language: str | None = None) -> str:
"""List available TTS voices."""
voices = tts.list_voices(language)
if not voices:
return f"No voices found for language: {language}"
lines = [f"## Available TTS Voices ({len(voices)} total)\n"]
current_lang = None
for v in voices:
if v["language"] != current_lang:
current_lang = v["language"]
lines.append(f"\n### {current_lang.upper()}")
gender_icon = "F" if v["gender"] == "female" else "M"
lines.append(f"- `{v['id']}` — {v['name']} ({gender_icon}) — {v['description']}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# File management tools
# ---------------------------------------------------------------------------
@mcp.tool(description="List files in the upload directory.")
async def list_uploaded_files() -> str:
"""List all uploaded audio files."""
files = file_manager.list_files(file_manager.UPLOAD_DIR)
if not files:
return "No uploaded files found."
lines = ["## Uploaded Files\n"]
lines.append("| File | Size (MB) | Modified |")
lines.append("|------|-----------|----------|")
for f in files:
lines.append(f"| {f['name']} | {f['size_mb']} | {f['modified']} |")
return "\n".join(lines)
@mcp.tool(description="List generated TTS audio files.")
async def list_generated_files() -> str:
"""List all generated TTS output files."""
files = file_manager.list_files(file_manager.TTS_DIR)
if not files:
return "No generated files found."
port = os.getenv("PORT", "8009")
lines = ["## Generated Files\n"]
lines.append("| File | Size (MB) | Download URL |")
lines.append("|------|-----------|-------------|")
for f in files:
url = f"http://192.168.1.3:{port}/files/{f['name']}"
lines.append(f"| {f['name']} | {f['size_mb']} | {url} |")
return "\n".join(lines)
@mcp.tool(description="Get upload endpoint URL and example curl command for uploading audio files.")
async def get_upload_info() -> str:
"""Return upload endpoint info and usage example."""
port = os.getenv("PORT", "8009")
return (
f"## File Upload\n\n"
f"**Endpoint:** `POST http://192.168.1.3:{port}/upload`\n\n"
f"**Example:**\n```bash\n"
f"curl -X POST http://192.168.1.3:{port}/upload -F \"file=@recording.m4a\"\n"
f"```\n\n"
f"Then use `transcribe_uploaded(filename=\"...\")` with the returned filename."
)
@mcp.tool(description="Delete an uploaded or generated file. file_type: 'upload' or 'generated'.")
async def delete_file(filename: str, file_type: str = "upload") -> str:
"""Delete a file from uploads or generated directory."""
directory = file_manager.UPLOAD_DIR if file_type == "upload" else file_manager.TTS_DIR
success = file_manager.delete_file(directory, filename)
if success:
return f"Deleted: {filename}"
return f"File not found or could not be deleted: {filename}"
# ---------------------------------------------------------------------------
# Utility tools
# ---------------------------------------------------------------------------
@mcp.tool(description="Convert audio format or sample rate using ffmpeg. Useful for preprocessing before transcription.")
async def convert_audio(
input_path: str,
output_format: str = "wav",
sample_rate: int | None = None,
) -> str:
"""Convert audio file to a different format or sample rate."""
src = Path(input_path)
if not src.is_file():
return f"Error: Input file not found: {input_path}"
stem = src.stem
dest = file_manager.UPLOAD_DIR / f"{stem}_converted.{output_format}"
cmd = ["ffmpeg", "-i", str(src), "-y"]
if sample_rate:
cmd.extend(["-ar", str(sample_rate)])
cmd.append(str(dest))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
return f"Conversion failed: {stderr.decode().strip()}"
size_mb = round(dest.stat().st_size / (1024 * 1024), 2)
return (
f"Converted successfully.\n"
f"- **Output:** {dest}\n"
f"- **Format:** {output_format}\n"
f"- **Size:** {size_mb} MB"
)
@mcp.tool(description="Verify Deepgram API key and check account/project info.")
async def check_api_status() -> str:
"""Check if the Deepgram API key is valid."""
status = await transcription.check_api_status()
if status["valid"]:
projects = status.get("projects", [])
lines = ["## Deepgram API Status: Valid\n"]
if projects:
lines.append("### Projects")
for p in projects:
lines.append(f"- **{p['name']}** (`{p['id']}`)")
return "\n".join(lines)
return f"## Deepgram API Status: Invalid\n\nError: {status.get('error', 'Unknown')}"
# ---------------------------------------------------------------------------
# Custom HTTP endpoints (FastMCP custom_route)
# ---------------------------------------------------------------------------
@mcp.custom_route("/health", methods=["GET"])
async def health_endpoint(request: Request) -> Response:
"""Health check endpoint for Docker."""
return JSONResponse({"status": "ok", "service": "deepgram-mcp"})
@mcp.custom_route("/upload", methods=["POST"])
async def upload_endpoint(request: Request) -> Response:
"""Multipart file upload — streams to disk."""
content_type = request.headers.get("content-type", "")
if "multipart/form-data" not in content_type:
return JSONResponse(
{"error": "Content-Type must be multipart/form-data"},
status_code=400,
)
form = await request.form()
upload = form.get("file")
if upload is None:
return JSONResponse({"error": "No 'file' field in form data"}, status_code=400)
content = await upload.read()
result = await file_manager.save_upload(upload.filename or "upload", content)
return JSONResponse(result)
@mcp.custom_route("/files/{name:path}", methods=["GET"])
async def files_endpoint(request: Request) -> Response:
"""Serve generated TTS files for download."""
name = request.path_params["name"]
file_path = file_manager.get_file_path(file_manager.TTS_DIR, name)
if file_path is None:
return JSONResponse({"error": "File not found"}, status_code=404)
return FileResponse(str(file_path), filename=name)
# ---------------------------------------------------------------------------
# Run server
# ---------------------------------------------------------------------------
if __name__ == "__main__":
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "8009"))
mcp.run(
transport="http",
host=host,
port=port,
)

View File

@@ -0,0 +1,230 @@
"""FFmpeg-based audio splitting for files exceeding the Deepgram size limit."""
import asyncio
import json
import shutil
import tempfile
from pathlib import Path
async def get_audio_duration(file_path: Path) -> float:
"""Get audio duration in seconds using ffprobe."""
proc = await asyncio.create_subprocess_exec(
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
str(file_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(
f"ffprobe failed (exit {proc.returncode}): {stderr.decode().strip()}"
)
info = json.loads(stdout)
return float(info["format"]["duration"])
def get_file_size_mb(file_path: Path) -> float:
"""Return the file size in megabytes."""
return file_path.stat().st_size / (1024 * 1024)
async def split_audio(
file_path: Path,
max_chunk_mb: int = 1500,
) -> list[Path]:
"""Split an audio file into chunks of approximately max_chunk_mb each.
Uses ffmpeg's segment muxer with stream copy (no re-encoding).
If the file is already under the limit, returns [file_path] unchanged.
"""
size_mb = get_file_size_mb(file_path)
if size_mb <= max_chunk_mb:
return [file_path]
duration = await get_audio_duration(file_path)
if duration <= 0:
raise ValueError(f"Invalid audio duration: {duration}s")
# Calculate segment time so each chunk is ~max_chunk_mb
segment_time = int(duration * max_chunk_mb / size_mb)
if segment_time < 1:
segment_time = 1
tmp_dir = Path(tempfile.mkdtemp(prefix="deepgram_chunks_"))
ext = file_path.suffix or ".wav"
pattern = str(tmp_dir / f"chunk_%03d{ext}")
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i", str(file_path),
"-f", "segment",
"-segment_time", str(segment_time),
"-c", "copy",
"-v", "warning",
pattern,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError(
f"ffmpeg split failed (exit {proc.returncode}): {stderr.decode().strip()}"
)
chunks = sorted(tmp_dir.glob(f"chunk_*{ext}"))
if not chunks:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError("ffmpeg produced no output chunks")
return chunks
def merge_transcription_results(
results: list[dict],
chunk_durations: list[float],
) -> dict:
"""Merge multiple Deepgram transcription responses into a single result.
Adjusts all timestamps by cumulative offset so chunks stitch together
correctly in the final timeline.
"""
if not results:
return {}
if len(results) == 1:
return results[0]
# Compute cumulative time offsets for each chunk
offsets = [0.0]
for dur in chunk_durations[:-1]:
offsets.append(offsets[-1] + dur)
merged_transcript_parts: list[str] = []
merged_words: list[dict] = []
merged_paragraphs: list[dict] = []
merged_utterances: list[dict] = []
merged_topics: list[dict] = []
merged_entities: list[dict] = []
merged_summaries: list[dict] = []
merged_sentiments: list[dict] = []
# Keep metadata from the first result as the base
base = results[0].copy()
for idx, result in enumerate(results):
offset = offsets[idx]
# Extract channel transcript data
channels = (
result.get("results", {}).get("channels", [])
)
if channels:
alt = channels[0].get("alternatives", [{}])[0]
transcript = alt.get("transcript", "")
if transcript:
merged_transcript_parts.append(transcript)
for word in alt.get("words", []):
adjusted = word.copy()
adjusted["start"] = round(word.get("start", 0) + offset, 3)
adjusted["end"] = round(word.get("end", 0) + offset, 3)
merged_words.append(adjusted)
for para in alt.get("paragraphs", {}).get("paragraphs", []):
adjusted = para.copy()
adjusted["start"] = round(para.get("start", 0) + offset, 3)
adjusted["end"] = round(para.get("end", 0) + offset, 3)
if "sentences" in adjusted:
adjusted["sentences"] = [
{
**s,
"start": round(s.get("start", 0) + offset, 3),
"end": round(s.get("end", 0) + offset, 3),
}
for s in adjusted["sentences"]
]
merged_paragraphs.append(adjusted)
# Utterances (diarization)
for utt in result.get("results", {}).get("utterances", []):
adjusted = utt.copy()
adjusted["start"] = round(utt.get("start", 0) + offset, 3)
adjusted["end"] = round(utt.get("end", 0) + offset, 3)
if "words" in adjusted:
adjusted["words"] = [
{
**w,
"start": round(w.get("start", 0) + offset, 3),
"end": round(w.get("end", 0) + offset, 3),
}
for w in adjusted["words"]
]
merged_utterances.append(adjusted)
# Topics, entities, summaries, sentiments -- concatenate lists
res = result.get("results", {})
merged_topics.extend(res.get("topics", {}).get("segments", []))
merged_entities.extend(res.get("entities", {}).get("segments", []))
merged_summaries.extend(
res.get("summary", {}).get("results", [])
or res.get("summaries", [])
)
merged_sentiments.extend(
res.get("sentiments", {}).get("segments", [])
)
# Assemble merged output
if "results" not in base:
base["results"] = {}
merged_results = base["results"]
# Rebuild channels
if merged_results.get("channels"):
channel = merged_results["channels"][0]
alt = channel.get("alternatives", [{}])[0]
alt["transcript"] = " ".join(merged_transcript_parts)
alt["words"] = merged_words
if merged_paragraphs:
alt["paragraphs"] = {"paragraphs": merged_paragraphs}
channel["alternatives"] = [alt]
merged_results["channels"] = [channel]
if merged_utterances:
merged_results["utterances"] = merged_utterances
if merged_topics:
merged_results.setdefault("topics", {})["segments"] = merged_topics
if merged_entities:
merged_results.setdefault("entities", {})["segments"] = merged_entities
if merged_summaries:
merged_results["summaries"] = merged_summaries
if merged_sentiments:
merged_results.setdefault("sentiments", {})["segments"] = merged_sentiments
return base
def cleanup_chunks(chunk_paths: list[Path]) -> None:
"""Delete temporary chunk files and their parent directory if it's a temp dir."""
if not chunk_paths:
return
parent = chunk_paths[0].parent
for path in chunk_paths:
try:
if path.is_file():
path.unlink()
except OSError:
pass
# Remove the temp directory if it's empty and looks like our temp dir
if parent.name.startswith("deepgram_chunks_"):
shutil.rmtree(parent, ignore_errors=True)

View File

@@ -0,0 +1,211 @@
"""Speech-to-text transcription via Deepgram REST API (httpx)."""
import os
from pathlib import Path
from typing import Union
import httpx
DEEPGRAM_API_URL = "https://api.deepgram.com/v1/listen"
MIME_TYPES: dict[str, str] = {
".mp3": "audio/mpeg",
".wav": "audio/wav",
".m4a": "audio/mp4",
".flac": "audio/flac",
".ogg": "audio/ogg",
".webm": "audio/webm",
".wma": "audio/x-ms-wma",
".aac": "audio/aac",
".mp4": "video/mp4",
}
MAX_FILE_SIZE_MB = 2000
def _get_api_key() -> str:
key = os.getenv("DEEPGRAM_API_KEY", "")
if not key:
raise ValueError("DEEPGRAM_API_KEY environment variable is not set")
return key
def _get_mime_type(file_path: Path) -> str:
return MIME_TYPES.get(file_path.suffix.lower(), "application/octet-stream")
def build_query_params(params: dict) -> dict:
"""Build Deepgram API query parameters from tool kwargs.
Filters None values, maps comma-separated strings to repeated params,
and converts booleans to lowercase strings.
"""
filtered = {k: v for k, v in params.items() if v is not None}
query: dict = {}
# Direct fields (string/number/bool)
direct_fields = [
"model", "version", "language", "detect_language",
"smart_format", "punctuate", "paragraphs", "numerals",
"measurements", "dictation",
"diarize", "utterances", "utt_split",
"summarize", "topics", "sentiment", "entities", "intents",
"profanity_filter", "filler_words",
"multichannel",
"encoding", "sample_rate",
"keyterm",
]
for field in direct_fields:
if field in filtered:
val = filtered[field]
if isinstance(val, bool):
query[field] = str(val).lower()
else:
query[field] = val
# Default diarize to true
if "diarize" not in query:
query["diarize"] = "true"
# Comma-separated -> repeated query params
csv_fields = [
"custom_topics", "custom_intents", "search",
"redact", "replace", "keywords",
]
for field in csv_fields:
if field in filtered:
val = filtered[field]
if isinstance(val, str):
items = [s.strip() for s in val.split(",") if s.strip()]
elif isinstance(val, list):
items = val
else:
continue
if items:
query[field] = items
return query
async def transcribe(
source: Union[str, Path, bytes],
options: dict,
) -> dict:
"""Transcribe audio from a URL, file path, or raw bytes.
Returns the full Deepgram transcription response as a dict.
"""
api_key = _get_api_key()
query_params = build_query_params(options)
headers = {"Authorization": f"Token {api_key}"}
# URL source
if isinstance(source, str) and source.startswith(("http://", "https://")):
headers["Content-Type"] = "application/json"
async with httpx.AsyncClient(timeout=600.0) as client:
resp = await client.post(
DEEPGRAM_API_URL,
params=query_params,
headers=headers,
json={"url": source},
)
resp.raise_for_status()
return resp.json()
# File path source
if isinstance(source, (str, Path)):
file_path = Path(source)
if not file_path.is_file():
raise FileNotFoundError(f"Audio file not found: {file_path}")
file_size_mb = file_path.stat().st_size / (1024 * 1024)
# Large file handling via chunked splitting
if file_size_mb > MAX_FILE_SIZE_MB:
return await _transcribe_large_file(file_path, query_params, headers)
data = file_path.read_bytes()
mime_type = _get_mime_type(file_path)
headers["Content-Type"] = mime_type
async with httpx.AsyncClient(timeout=600.0) as client:
resp = await client.post(
DEEPGRAM_API_URL,
params=query_params,
headers=headers,
content=data,
)
resp.raise_for_status()
return resp.json()
# Raw bytes source
if isinstance(source, bytes):
headers["Content-Type"] = "application/octet-stream"
async with httpx.AsyncClient(timeout=600.0) as client:
resp = await client.post(
DEEPGRAM_API_URL,
params=query_params,
headers=headers,
content=source,
)
resp.raise_for_status()
return resp.json()
raise TypeError(f"Unsupported source type: {type(source)}")
async def _transcribe_large_file(
file_path: Path, query_params: dict, headers: dict
) -> dict:
"""Split a large file into chunks, transcribe each, and merge results."""
from . import splitter
chunks = await splitter.split_audio(file_path)
try:
api_key = _get_api_key()
results = []
chunk_durations = []
for chunk in chunks:
data = chunk.read_bytes()
mime_type = _get_mime_type(chunk)
chunk_headers = {
**headers,
"Content-Type": mime_type,
}
async with httpx.AsyncClient(timeout=600.0) as client:
resp = await client.post(
DEEPGRAM_API_URL,
params=query_params,
headers=chunk_headers,
content=data,
)
resp.raise_for_status()
result = resp.json()
results.append(result)
duration = (result.get("metadata") or {}).get("duration", 0.0)
chunk_durations.append(duration)
return splitter.merge_transcription_results(results, chunk_durations)
finally:
splitter.cleanup_chunks(chunks)
async def check_api_status() -> dict:
"""Verify the Deepgram API key by listing projects.
Returns dict with 'valid' (bool), 'projects' (list), and 'error' (str|None).
"""
try:
api_key = _get_api_key()
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(
"https://api.deepgram.com/v1/projects",
headers={"Authorization": f"Token {api_key}"},
)
resp.raise_for_status()
data = resp.json()
projects = [
{"id": p.get("project_id", ""), "name": p.get("name", "")}
for p in data.get("projects", [])
]
return {"valid": True, "projects": projects, "error": None}
except Exception as exc:
return {"valid": False, "projects": [], "error": str(exc)}

View File

@@ -0,0 +1,197 @@
"""Deepgram Text-to-Speech wrapper using Aura-2 voices (httpx REST API)."""
from __future__ import annotations
import os
import time
import httpx
DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak"
ENCODING_TO_EXT: dict[str, str] = {
"mp3": "mp3",
"linear16": "wav",
"wav": "wav",
"flac": "flac",
"opus": "opus",
"aac": "aac",
"mulaw": "wav",
}
# Real Deepgram Aura-2 voice IDs (format: aura-2-{name}-{lang})
VOICES: list[dict[str, str]] = [
# English (US) - Feminine
{"id": "aura-2-asteria-en", "name": "Asteria", "language": "en", "locale": "en-US", "gender": "female", "description": "Warm professional"},
{"id": "aura-2-luna-en", "name": "Luna", "language": "en", "locale": "en-US", "gender": "female", "description": "Soft gentle"},
{"id": "aura-2-athena-en", "name": "Athena", "language": "en", "locale": "en-US", "gender": "female", "description": "Authoritative"},
{"id": "aura-2-aurora-en", "name": "Aurora", "language": "en", "locale": "en-US", "gender": "female", "description": "Bright energetic"},
{"id": "aura-2-thalia-en", "name": "Thalia", "language": "en", "locale": "en-US", "gender": "female", "description": "Natural conversational"},
{"id": "aura-2-andromeda-en", "name": "Andromeda", "language": "en", "locale": "en-US", "gender": "female", "description": "Clear articulate"},
{"id": "aura-2-helena-en", "name": "Helena", "language": "en", "locale": "en-US", "gender": "female", "description": "Elegant polished"},
{"id": "aura-2-callista-en", "name": "Callista", "language": "en", "locale": "en-US", "gender": "female", "description": "Friendly upbeat"},
{"id": "aura-2-cora-en", "name": "Cora", "language": "en", "locale": "en-US", "gender": "female", "description": "Calm soothing"},
{"id": "aura-2-electra-en", "name": "Electra", "language": "en", "locale": "en-US", "gender": "female", "description": "Dynamic expressive"},
{"id": "aura-2-iris-en", "name": "Iris", "language": "en", "locale": "en-US", "gender": "female", "description": "Bright cheerful"},
{"id": "aura-2-juno-en", "name": "Juno", "language": "en", "locale": "en-US", "gender": "female", "description": "Confident mature"},
{"id": "aura-2-minerva-en", "name": "Minerva", "language": "en", "locale": "en-US", "gender": "female", "description": "Wise scholarly"},
{"id": "aura-2-ophelia-en", "name": "Ophelia", "language": "en", "locale": "en-US", "gender": "female", "description": "Dramatic expressive"},
{"id": "aura-2-phoebe-en", "name": "Phoebe", "language": "en", "locale": "en-US", "gender": "female", "description": "Youthful fresh"},
{"id": "aura-2-selene-en", "name": "Selene", "language": "en", "locale": "en-US", "gender": "female", "description": "Serene ethereal"},
{"id": "aura-2-vesta-en", "name": "Vesta", "language": "en", "locale": "en-US", "gender": "female", "description": "Warm nurturing"},
{"id": "aura-2-cordelia-en", "name": "Cordelia", "language": "en", "locale": "en-US", "gender": "female", "description": "Regal composed"},
{"id": "aura-2-delia-en", "name": "Delia", "language": "en", "locale": "en-US", "gender": "female", "description": "Light melodic"},
{"id": "aura-2-harmonia-en", "name": "Harmonia", "language": "en", "locale": "en-US", "gender": "female", "description": "Balanced harmonious"},
{"id": "aura-2-amalthea-en", "name": "Amalthea", "language": "en", "locale": "en-US", "gender": "female", "description": "Gentle nurturing"},
{"id": "aura-2-janus-en", "name": "Janus", "language": "en", "locale": "en-US", "gender": "female", "description": "Versatile adaptive"},
# English (US) - Masculine
{"id": "aura-2-orion-en", "name": "Orion", "language": "en", "locale": "en-US", "gender": "male", "description": "Deep resonant"},
{"id": "aura-2-arcas-en", "name": "Arcas", "language": "en", "locale": "en-US", "gender": "male", "description": "Youthful energetic"},
{"id": "aura-2-orpheus-en", "name": "Orpheus", "language": "en", "locale": "en-US", "gender": "male", "description": "Expressive poetic"},
{"id": "aura-2-zeus-en", "name": "Zeus", "language": "en", "locale": "en-US", "gender": "male", "description": "Commanding powerful"},
{"id": "aura-2-apollo-en", "name": "Apollo", "language": "en", "locale": "en-US", "gender": "male", "description": "Bright confident"},
{"id": "aura-2-atlas-en", "name": "Atlas", "language": "en", "locale": "en-US", "gender": "male", "description": "Strong steady"},
{"id": "aura-2-hermes-en", "name": "Hermes", "language": "en", "locale": "en-US", "gender": "male", "description": "Quick articulate"},
{"id": "aura-2-jupiter-en", "name": "Jupiter", "language": "en", "locale": "en-US", "gender": "male", "description": "Authoritative warm"},
{"id": "aura-2-mars-en", "name": "Mars", "language": "en", "locale": "en-US", "gender": "male", "description": "Bold assertive"},
{"id": "aura-2-neptune-en", "name": "Neptune", "language": "en", "locale": "en-US", "gender": "male", "description": "Calm deep"},
{"id": "aura-2-odysseus-en", "name": "Odysseus", "language": "en", "locale": "en-US", "gender": "male", "description": "Storyteller adventurous"},
{"id": "aura-2-pluto-en", "name": "Pluto", "language": "en", "locale": "en-US", "gender": "male", "description": "Dark mysterious"},
{"id": "aura-2-saturn-en", "name": "Saturn", "language": "en", "locale": "en-US", "gender": "male", "description": "Mature wise"},
{"id": "aura-2-aries-en", "name": "Aries", "language": "en", "locale": "en-US", "gender": "male", "description": "Energetic dynamic"},
# English (GB)
{"id": "aura-2-pandora-en", "name": "Pandora", "language": "en", "locale": "en-GB", "gender": "female", "description": "British female"},
{"id": "aura-2-draco-en", "name": "Draco", "language": "en", "locale": "en-GB", "gender": "male", "description": "British male"},
# English (AU)
{"id": "aura-2-theia-en", "name": "Theia", "language": "en", "locale": "en-AU", "gender": "female", "description": "Australian female"},
{"id": "aura-2-hyperion-en", "name": "Hyperion", "language": "en", "locale": "en-AU", "gender": "male", "description": "Australian male"},
# Spanish - Mexican
{"id": "aura-2-estrella-es", "name": "Estrella", "language": "es", "locale": "es-MX", "gender": "female", "description": "Mexican female"},
{"id": "aura-2-olivia-es", "name": "Olivia", "language": "es", "locale": "es-MX", "gender": "female", "description": "Mexican female warm"},
{"id": "aura-2-sirio-es", "name": "Sirio", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male"},
{"id": "aura-2-javier-es", "name": "Javier", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male warm"},
{"id": "aura-2-luciano-es", "name": "Luciano", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male expressive"},
{"id": "aura-2-valerio-es", "name": "Valerio", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male confident"},
# Spanish - Peninsular
{"id": "aura-2-carina-es", "name": "Carina", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female"},
{"id": "aura-2-diana-es", "name": "Diana", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female elegant"},
{"id": "aura-2-agustina-es", "name": "Agustina", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female classic"},
{"id": "aura-2-silvia-es", "name": "Silvia", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female bright"},
{"id": "aura-2-nestor-es", "name": "Nestor", "language": "es", "locale": "es-ES", "gender": "male", "description": "Castilian male"},
{"id": "aura-2-alvaro-es", "name": "Alvaro", "language": "es", "locale": "es-ES", "gender": "male", "description": "Castilian male confident"},
# Spanish - Colombian / Argentine / LatAm
{"id": "aura-2-celeste-es", "name": "Celeste", "language": "es", "locale": "es-CO", "gender": "female", "description": "Colombian female"},
{"id": "aura-2-gloria-es", "name": "Gloria", "language": "es", "locale": "es-CO", "gender": "female", "description": "Colombian female warm"},
{"id": "aura-2-antonia-es", "name": "Antonia", "language": "es", "locale": "es-AR", "gender": "female", "description": "Argentine female"},
{"id": "aura-2-aquila-es", "name": "Aquila", "language": "es", "locale": "es-419", "gender": "male", "description": "Latin American male"},
{"id": "aura-2-selena-es", "name": "Selena", "language": "es", "locale": "es-419", "gender": "female", "description": "Latin American female"},
# German
{"id": "aura-2-elara-de", "name": "Elara", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female natural"},
{"id": "aura-2-aurelia-de", "name": "Aurelia", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female elegant"},
{"id": "aura-2-lara-de", "name": "Lara", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female youthful"},
{"id": "aura-2-kara-de", "name": "Kara", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female confident"},
{"id": "aura-2-viktoria-de", "name": "Viktoria", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female strong"},
{"id": "aura-2-julius-de", "name": "Julius", "language": "de", "locale": "de-DE", "gender": "male", "description": "German male professional"},
{"id": "aura-2-fabian-de", "name": "Fabian", "language": "de", "locale": "de-DE", "gender": "male", "description": "German male warm"},
# French
{"id": "aura-2-agathe-fr", "name": "Agathe", "language": "fr", "locale": "fr-FR", "gender": "female", "description": "French female"},
{"id": "aura-2-hector-fr", "name": "Hector", "language": "fr", "locale": "fr-FR", "gender": "male", "description": "French male"},
# Dutch
{"id": "aura-2-beatrix-nl", "name": "Beatrix", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female classic"},
{"id": "aura-2-daphne-nl", "name": "Daphne", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female natural"},
{"id": "aura-2-cornelia-nl", "name": "Cornelia", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female warm"},
{"id": "aura-2-hestia-nl", "name": "Hestia", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female gentle"},
{"id": "aura-2-rhea-nl", "name": "Rhea", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female bright"},
{"id": "aura-2-leda-nl", "name": "Leda", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female elegant"},
{"id": "aura-2-sander-nl", "name": "Sander", "language": "nl", "locale": "nl-NL", "gender": "male", "description": "Dutch male natural"},
{"id": "aura-2-lars-nl", "name": "Lars", "language": "nl", "locale": "nl-NL", "gender": "male", "description": "Dutch male confident"},
{"id": "aura-2-roman-nl", "name": "Roman", "language": "nl", "locale": "nl-NL", "gender": "male", "description": "Dutch male warm"},
# Italian
{"id": "aura-2-melia-it", "name": "Melia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female natural"},
{"id": "aura-2-maia-it", "name": "Maia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female warm"},
{"id": "aura-2-cinzia-it", "name": "Cinzia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female elegant"},
{"id": "aura-2-livia-it", "name": "Livia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female classic"},
{"id": "aura-2-demetra-it", "name": "Demetra", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female strong"},
{"id": "aura-2-elio-it", "name": "Elio", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male bright"},
{"id": "aura-2-flavio-it", "name": "Flavio", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male warm"},
{"id": "aura-2-cesare-it", "name": "Cesare", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male authoritative"},
{"id": "aura-2-perseo-it", "name": "Perseo", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male dynamic"},
{"id": "aura-2-dionisio-it", "name": "Dionisio", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male expressive"},
# Japanese
{"id": "aura-2-uzume-ja", "name": "Uzume", "language": "ja", "locale": "ja-JP", "gender": "female", "description": "Japanese female natural"},
{"id": "aura-2-izanami-ja", "name": "Izanami", "language": "ja", "locale": "ja-JP", "gender": "female", "description": "Japanese female elegant"},
{"id": "aura-2-ebisu-ja", "name": "Ebisu", "language": "ja", "locale": "ja-JP", "gender": "male", "description": "Japanese male warm"},
{"id": "aura-2-fujin-ja", "name": "Fujin", "language": "ja", "locale": "ja-JP", "gender": "male", "description": "Japanese male dynamic"},
{"id": "aura-2-ama-ja", "name": "Ama", "language": "ja", "locale": "ja-JP", "gender": "male", "description": "Japanese male natural"},
]
def list_voices(language: str | None = None) -> list[dict[str, str]]:
"""Return voices, optionally filtered by language code.
The filter is case-insensitive and matches both short codes ("en")
and full locale codes ("en-US").
"""
if language is None:
return sorted(VOICES, key=lambda v: (v["language"], v["name"]))
lang = language.lower()
filtered = [
v for v in VOICES
if v["language"].lower() == lang or v["locale"].lower() == lang
]
return sorted(filtered, key=lambda v: (v["language"], v["name"]))
def get_voice_info(voice_id: str) -> dict[str, str] | None:
"""Return voice info dict for a given voice ID, or None if not found."""
for voice in VOICES:
if voice["id"] == voice_id:
return voice
return None
async def text_to_speech(
text: str,
model: str = "aura-2-asteria-en",
encoding: str = "mp3",
sample_rate: int = 24000,
container: str | None = None,
) -> tuple[bytes, str]:
"""Convert text to speech using Deepgram Aura-2 REST API.
Returns a tuple of (audio_bytes, suggested_filename).
"""
api_key = os.environ.get("DEEPGRAM_API_KEY", "")
if not api_key:
raise ValueError("DEEPGRAM_API_KEY environment variable is not set")
params: dict = {
"model": model,
"encoding": encoding,
"sample_rate": str(sample_rate),
}
if container is not None:
params["container"] = container
headers = {
"Authorization": f"Token {api_key}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
DEEPGRAM_TTS_URL,
params=params,
headers=headers,
json={"text": text},
)
resp.raise_for_status()
audio_bytes = resp.content
ext = ENCODING_TO_EXT.get(encoding, encoding)
model_short = model.rsplit("-", 1)[-1]
timestamp = int(time.time())
filename = f"tts_{timestamp}_{model_short}.{ext}"
return audio_bytes, filename