diff --git a/deepgram-mcp/.env.example b/deepgram-mcp/.env.example new file mode 100644 index 0000000..d25a316 --- /dev/null +++ b/deepgram-mcp/.env.example @@ -0,0 +1 @@ +DEEPGRAM_API_KEY=your_api_key_here diff --git a/deepgram-mcp/Dockerfile b/deepgram-mcp/Dockerfile new file mode 100644 index 0000000..194779c --- /dev/null +++ b/deepgram-mcp/Dockerfile @@ -0,0 +1,21 @@ +FROM python:3.11-slim + +RUN apt-get update && \ + apt-get install -y --no-install-recommends ffmpeg curl && \ + rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY src/ src/ + +ENV PYTHONPATH=/app/src + +EXPOSE 8009 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ + CMD curl -f http://localhost:8009/health || exit 1 + +CMD ["python", "-m", "deepgram_mcp.server"] diff --git a/deepgram-mcp/docker-compose.yml b/deepgram-mcp/docker-compose.yml new file mode 100644 index 0000000..98e224f --- /dev/null +++ b/deepgram-mcp/docker-compose.yml @@ -0,0 +1,21 @@ +services: + deepgram-mcp: + build: . + container_name: deepgram-mcp + restart: unless-stopped + ports: + - "8009:8009" + volumes: + - deepgram-uploads:/data/uploads + - deepgram-tts:/data/tts_output + env_file: + - .env + environment: + - UPLOAD_DIR=/data/uploads + - TTS_DIR=/data/tts_output + - HOST=0.0.0.0 + - PORT=8009 + +volumes: + deepgram-uploads: + deepgram-tts: diff --git a/deepgram-mcp/requirements.txt b/deepgram-mcp/requirements.txt new file mode 100644 index 0000000..a988817 --- /dev/null +++ b/deepgram-mcp/requirements.txt @@ -0,0 +1,7 @@ +fastmcp>=2.0.0 +httpx +aiofiles +python-dotenv +python-multipart +starlette +uvicorn diff --git a/deepgram-mcp/src/deepgram_mcp/__init__.py b/deepgram-mcp/src/deepgram_mcp/__init__.py new file mode 100644 index 0000000..6aa553c --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/__init__.py @@ -0,0 +1 @@ +# Deepgram MCP Server diff --git a/deepgram-mcp/src/deepgram_mcp/__pycache__/__init__.cpython-312.pyc b/deepgram-mcp/src/deepgram_mcp/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..a78d274 Binary files /dev/null and b/deepgram-mcp/src/deepgram_mcp/__pycache__/__init__.cpython-312.pyc differ diff --git a/deepgram-mcp/src/deepgram_mcp/__pycache__/formatter.cpython-312.pyc b/deepgram-mcp/src/deepgram_mcp/__pycache__/formatter.cpython-312.pyc new file mode 100644 index 0000000..479168c Binary files /dev/null and b/deepgram-mcp/src/deepgram_mcp/__pycache__/formatter.cpython-312.pyc differ diff --git a/deepgram-mcp/src/deepgram_mcp/file_manager.py b/deepgram-mcp/src/deepgram_mcp/file_manager.py new file mode 100644 index 0000000..86f5901 --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/file_manager.py @@ -0,0 +1,101 @@ +"""File upload, download, and listing management for Deepgram MCP server.""" + +import os +import re +from datetime import datetime, timezone +from pathlib import Path + +import aiofiles + +UPLOAD_DIR = Path(os.getenv("UPLOAD_DIR", "/data/uploads")) +TTS_DIR = Path(os.getenv("TTS_DIR", "/data/tts_output")) + +UPLOAD_DIR.mkdir(parents=True, exist_ok=True) +TTS_DIR.mkdir(parents=True, exist_ok=True) + + +def _sanitize_filename(filename: str) -> str: + """Strip path components and dangerous characters from a filename.""" + # Take only the basename (no directory traversal) + name = Path(filename).name + # Remove any remaining path separators or null bytes + name = re.sub(r'[/\\:\x00]', '', name) + # Collapse whitespace + name = re.sub(r'\s+', '_', name.strip()) + if not name: + name = "unnamed_file" + return name + + +def _timestamp_prefix() -> str: + """Generate a timestamp prefix for collision avoidance.""" + return datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") + + +async def save_upload(filename: str, content: bytes) -> dict: + """Save uploaded file content with a timestamp prefix to avoid collisions. + + Returns dict with filename, path, and size_mb. + """ + safe_name = _sanitize_filename(filename) + prefixed_name = f"{_timestamp_prefix()}_{safe_name}" + dest = UPLOAD_DIR / prefixed_name + + async with aiofiles.open(dest, "wb") as f: + await f.write(content) + + size_mb = round(dest.stat().st_size / (1024 * 1024), 2) + return { + "filename": prefixed_name, + "path": str(dest), + "size_mb": size_mb, + } + + +def list_files(directory: Path) -> list[dict]: + """List files in a directory with name, size_mb, and modified date.""" + if not directory.is_dir(): + return [] + + files = [] + for entry in sorted(directory.iterdir()): + if entry.is_file(): + stat = entry.stat() + files.append({ + "name": entry.name, + "size_mb": round(stat.st_size / (1024 * 1024), 2), + "modified": datetime.fromtimestamp( + stat.st_mtime, tz=timezone.utc + ).isoformat(), + }) + return files + + +def delete_file(directory: Path, filename: str) -> bool: + """Delete a file from the given directory. Returns True on success.""" + safe_name = _sanitize_filename(filename) + target = directory / safe_name + + # Ensure the resolved path is still within the directory + try: + target.resolve().relative_to(directory.resolve()) + except ValueError: + return False + + if target.is_file(): + target.unlink() + return True + return False + + +def get_file_path(directory: Path, filename: str) -> Path | None: + """Return the full path if the file exists in the directory, else None.""" + safe_name = _sanitize_filename(filename) + target = directory / safe_name + + try: + target.resolve().relative_to(directory.resolve()) + except ValueError: + return None + + return target if target.is_file() else None diff --git a/deepgram-mcp/src/deepgram_mcp/formatter.py b/deepgram-mcp/src/deepgram_mcp/formatter.py new file mode 100644 index 0000000..d32a284 --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/formatter.py @@ -0,0 +1,332 @@ +"""Format Deepgram JSON responses into readable markdown.""" + +from __future__ import annotations + + +def format_timestamp(seconds: float) -> str: + """Format seconds into H:MM:SS or M:SS.""" + total = int(seconds) + h, remainder = divmod(total, 3600) + m, s = divmod(remainder, 60) + if h > 0: + return f"{h}:{m:02d}:{s:02d}" + return f"{m}:{s:02d}" + + +def format_duration(seconds: float) -> str: + """Format seconds into human-readable duration like '5m 32s' or '1h 5m 32s'.""" + total = int(seconds) + h, remainder = divmod(total, 3600) + m, s = divmod(remainder, 60) + parts: list[str] = [] + if h > 0: + parts.append(f"{h}h") + if m > 0 or h > 0: + parts.append(f"{m}m") + parts.append(f"{s}s") + return " ".join(parts) + + +def truncate_result(text: str, max_chars: int = 80000) -> tuple[str, bool]: + """Truncate text at last newline before limit if too long.""" + if len(text) <= max_chars: + return text, False + truncated = text[:max_chars] + last_newline = truncated.rfind("\n") + if last_newline > 0: + truncated = truncated[:last_newline] + truncated += "\n\n---\n*[Truncated - full transcript saved to file]*" + return truncated, True + + +def format_transcription(response: dict, include_timestamps: bool = True) -> str: + """Format a Deepgram transcription response into readable markdown. + + Args: + response: Raw Deepgram JSON response dict. + include_timestamps: Whether to include timestamps in transcript output. + + Returns: + Formatted markdown string. + """ + sections: list[str] = [] + + metadata = response.get("metadata") or {} + results = response.get("results") or {} + channels = results.get("channels") or [] + first_alt = {} + if channels: + alts = channels[0].get("alternatives") or [] + if alts: + first_alt = alts[0] + + # --- Metadata header --- + section = _format_metadata(metadata, first_alt) + if section: + sections.append(section) + + # --- Transcript --- + utterances = results.get("utterances") + section = _format_transcript(first_alt, utterances, include_timestamps) + if section: + sections.append(section) + + # --- Summary --- + section = _format_summaries(first_alt) + if section: + sections.append(section) + + # --- Topics --- + section = _format_topics(first_alt) + if section: + sections.append(section) + + # --- Entities --- + section = _format_entities(first_alt) + if section: + sections.append(section) + + # --- Sentiment --- + section = _format_sentiment(first_alt) + if section: + sections.append(section) + + # --- Intents --- + section = _format_intents(first_alt) + if section: + sections.append(section) + + # --- Search Results --- + section = _format_search(first_alt) + if section: + sections.append(section) + + return "\n\n".join(sections) + + +def _format_metadata(metadata: dict, first_alt: dict) -> str: + """Build the metadata header section.""" + lines = ["## Transcription Results"] + + duration = metadata.get("duration") + if duration is not None: + lines.append(f"- **Duration:** {format_duration(duration)}") + + model_info = metadata.get("model_info") + if model_info and isinstance(model_info, dict): + for info in model_info.values(): + name = info.get("name") if isinstance(info, dict) else None + if name: + lines.append(f"- **Model:** {name}") + break + + confidence = first_alt.get("confidence") + if confidence is not None: + lines.append(f"- **Confidence:** {confidence * 100:.1f}%") + + num_channels = metadata.get("channels") + if num_channels is not None: + lines.append(f"- **Channels:** {num_channels}") + + return "\n".join(lines) + + +def _format_transcript( + first_alt: dict, + utterances: list[dict] | None, + include_timestamps: bool, +) -> str: + """Build the transcript section using utterances, paragraphs, or plain text.""" + # Prefer utterances (diarized output) + if utterances: + lines = ["### Transcript", ""] + for utt in utterances: + speaker = utt.get("speaker", "?") + text = utt.get("transcript", "").strip() + if include_timestamps: + start = format_timestamp(utt.get("start", 0)) + end = format_timestamp(utt.get("end", 0)) + lines.append(f"**Speaker {speaker}** ({start} - {end}): {text}") + else: + lines.append(f"**Speaker {speaker}**: {text}") + lines.append("") + return "\n".join(lines).rstrip() + + # Fall back to paragraphs + paragraphs_data = first_alt.get("paragraphs") + if paragraphs_data and isinstance(paragraphs_data, dict): + paras = paragraphs_data.get("paragraphs") or [] + if paras: + lines = ["### Transcript", ""] + for para in paras: + speaker = para.get("speaker") + sentences = para.get("sentences") or [] + text = " ".join(s.get("text", "") for s in sentences).strip() + if not text: + continue + if speaker is not None and include_timestamps: + start = format_timestamp(para.get("start", 0)) + end = format_timestamp(para.get("end", 0)) + lines.append( + f"**Speaker {speaker}** ({start} - {end}): {text}" + ) + elif speaker is not None: + lines.append(f"**Speaker {speaker}**: {text}") + else: + lines.append(text) + lines.append("") + return "\n".join(lines).rstrip() + + # Fall back to plain transcript + transcript = first_alt.get("transcript", "").strip() + if transcript: + return f"### Transcript\n\n{transcript}" + + return "" + + +def _format_summaries(first_alt: dict) -> str: + """Build the summary section.""" + summaries = first_alt.get("summaries") + if not summaries: + return "" + texts = [s.get("summary", "") for s in summaries if s.get("summary")] + if not texts: + return "" + return "### Summary\n\n" + "\n\n".join(texts) + + +def _format_topics(first_alt: dict) -> str: + """Build the topics section.""" + topics_data = first_alt.get("topics") + if not topics_data or not isinstance(topics_data, dict): + return "" + segments = topics_data.get("segments") or [] + # Collect unique topics with their highest confidence + seen: dict[str, float] = {} + for seg in segments: + for t in seg.get("topics") or []: + topic = t.get("topic", "") + conf = t.get("confidence", 0) + if topic and (topic not in seen or conf > seen[topic]): + seen[topic] = conf + if not seen: + return "" + lines = ["### Topics"] + for topic, conf in sorted(seen.items(), key=lambda x: x[1], reverse=True): + lines.append(f"- **{topic}** ({conf * 100:.1f}%)") + return "\n".join(lines) + + +def _format_entities(first_alt: dict) -> str: + """Build the entities table.""" + entities_data = first_alt.get("entities") + if not entities_data or not isinstance(entities_data, dict): + return "" + segments = entities_data.get("segments") or [] + rows: list[tuple[str, str, float]] = [] + for seg in segments: + for ent in seg.get("entities") or []: + label = ent.get("label", "") + value = ent.get("value", "") + conf = ent.get("confidence", 0) + if label and value: + rows.append((label, value, conf)) + if not rows: + return "" + lines = [ + "### Entities", + "", + "| Type | Value | Confidence |", + "|------|-------|------------|", + ] + for label, value, conf in rows: + lines.append(f"| {label} | {value} | {conf * 100:.1f}% |") + return "\n".join(lines) + + +def _format_sentiment(first_alt: dict) -> str: + """Build the sentiment section.""" + sentiments_data = first_alt.get("sentiments") + if not sentiments_data or not isinstance(sentiments_data, dict): + return "" + + lines = ["### Sentiment"] + + average = sentiments_data.get("average") + if average and isinstance(average, dict): + sentiment = average.get("sentiment", "") + score = average.get("sentiment_score") + if sentiment and score is not None: + lines.append(f"\n**Overall:** {sentiment.capitalize()} ({score:.2f})") + + segments = sentiments_data.get("segments") or [] + if segments: + lines.append("") + lines.append("| Segment | Sentiment | Score |") + lines.append("|---------|-----------|-------|") + for seg in segments: + text = seg.get("text", "").strip() + sentiment = seg.get("sentiment", "") + score = seg.get("sentiment_score") + if text and sentiment and score is not None: + # Truncate long segment text for table readability + display = text if len(text) <= 60 else text[:57] + "..." + lines.append( + f'| "{display}" | {sentiment.capitalize()} | {score:.2f} |' + ) + + if len(lines) <= 1: + return "" + return "\n".join(lines) + + +def _format_intents(first_alt: dict) -> str: + """Build the intents section.""" + intents_data = first_alt.get("intents") + if not intents_data or not isinstance(intents_data, dict): + return "" + segments = intents_data.get("segments") or [] + # Collect unique intents with highest confidence + seen: dict[str, float] = {} + for seg in segments: + for intent in seg.get("intents") or []: + name = intent.get("intent", "") + conf = intent.get("confidence", 0) + if name and (name not in seen or conf > seen[name]): + seen[name] = conf + if not seen: + return "" + lines = ["### Intents"] + for name, conf in sorted(seen.items(), key=lambda x: x[1], reverse=True): + lines.append(f"- **{name}** ({conf * 100:.1f}%)") + return "\n".join(lines) + + +def _format_search(first_alt: dict) -> str: + """Build the search results section with timestamps.""" + search_data = first_alt.get("search") + if not search_data: + return "" + + lines = ["### Search Results"] + for group in search_data: + query = group.get("query", "") + hits = group.get("hits") or [] + lines.append(f"\n**\"{query}\"**") + if not hits: + lines.append("No matches found.") + continue + for hit in hits: + snippet = hit.get("snippet", "") + start = hit.get("start", 0) + end = hit.get("end", 0) + conf = hit.get("confidence", 0) + lines.append( + f"- ({format_timestamp(start)} - {format_timestamp(end)}) " + f"*{snippet}* ({conf * 100:.1f}%)" + ) + + if len(lines) <= 1: + return "" + return "\n".join(lines) diff --git a/deepgram-mcp/src/deepgram_mcp/server.py b/deepgram-mcp/src/deepgram_mcp/server.py new file mode 100644 index 0000000..96ba99c --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/server.py @@ -0,0 +1,461 @@ +"""Deepgram MCP Server — FastMCP 2.x with custom HTTP routes.""" + +import asyncio +import os +from pathlib import Path + +import aiofiles +from dotenv import load_dotenv +from fastmcp import FastMCP +from starlette.requests import Request +from starlette.responses import FileResponse, JSONResponse, Response + +from deepgram_mcp import file_manager, formatter, transcription, tts + +load_dotenv() + +mcp = FastMCP("Deepgram MCP") + +# --------------------------------------------------------------------------- +# Shared transcription parameter docstring +# --------------------------------------------------------------------------- +_TRANSCRIBE_PARAMS_DOC = """ +Parameters: + model: Deepgram model (nova-3, nova-2, enhanced, base, whisper-large). Default: nova-3 + language: BCP-47 language code (e.g. en, es, fr). Omit for auto-detect. + detect_language: Auto-detect language (bool). + smart_format: Enable smart formatting (bool, default True). + punctuate: Add punctuation (bool). + paragraphs: Split into paragraphs (bool). + numerals: Convert numbers to digits (bool). + measurements: Format measurements (bool). + dictation: Dictation mode with spoken punctuation (bool). + diarize: Speaker diarization (bool, default True). + utterances: Return utterances (bool). + utt_split: Pause threshold in seconds for utterance splitting (float). + summarize: Generate summary (bool). + topics: Detect topics (bool). + sentiment: Analyze sentiment (bool). + entities: Detect entities (bool). + intents: Detect intents (bool). + custom_topics: Comma-separated custom topics (up to 100). + custom_intents: Comma-separated custom intents. + keywords: Comma-separated "term:boost" pairs for keyword boosting. + keyterm: Prompting term for Nova-3. + search: Comma-separated terms to search for in audio. + redact: Comma-separated redaction types (pci, pii, numbers). + profanity_filter: Filter profanity (bool). + replace: Comma-separated "find:replace" pairs. + filler_words: Transcribe filler words like um, uh (bool). + multichannel: Treat each channel independently (bool). + encoding: Audio encoding (linear16, flac, mulaw, opus, etc.). + sample_rate: Audio sample rate in Hz. +""" + + +def _collect_options(**kwargs) -> dict: + """Filter out None values from tool kwargs to build options dict.""" + return {k: v for k, v in kwargs.items() if v is not None} + + +async def _do_transcribe(source, **kwargs) -> str: + """Run transcription, format result, handle truncation.""" + options = _collect_options(**kwargs) + result = await transcription.transcribe(source, options) + text = formatter.format_transcription(result) + text, was_truncated = formatter.truncate_result(text) + if was_truncated: + # Save full transcript to file + full_text = formatter.format_transcription(result) + save_path = file_manager.TTS_DIR / "full_transcript.md" + async with aiofiles.open(save_path, "w") as f: + await f.write(full_text) + text += f"\n\nFull transcript saved to: {save_path}" + return text + + +# --------------------------------------------------------------------------- +# Transcription tools +# --------------------------------------------------------------------------- + + +@mcp.tool(description="Transcribe audio from a file path on the NUC server." + _TRANSCRIBE_PARAMS_DOC) +async def transcribe_file( + path: str, + model: str = "nova-3", + language: str | None = None, + detect_language: bool | None = None, + smart_format: bool = True, + punctuate: bool | None = None, + paragraphs: bool | None = None, + numerals: bool | None = None, + measurements: bool | None = None, + dictation: bool | None = None, + diarize: bool = True, + utterances: bool | None = None, + utt_split: float | None = None, + summarize: bool | None = None, + topics: bool | None = None, + sentiment: bool | None = None, + entities: bool | None = None, + intents: bool | None = None, + custom_topics: str | None = None, + custom_intents: str | None = None, + keywords: str | None = None, + keyterm: str | None = None, + search: str | None = None, + redact: str | None = None, + profanity_filter: bool | None = None, + replace: str | None = None, + filler_words: bool | None = None, + multichannel: bool | None = None, + encoding: str | None = None, + sample_rate: int | None = None, +) -> str: + """Transcribe an audio file from a filesystem path on the NUC.""" + file_path = Path(path) + if not file_path.is_file(): + return f"Error: File not found: {path}" + return await _do_transcribe( + file_path, + model=model, language=language, detect_language=detect_language, + smart_format=smart_format, punctuate=punctuate, paragraphs=paragraphs, + numerals=numerals, measurements=measurements, dictation=dictation, + diarize=diarize, utterances=utterances, utt_split=utt_split, + summarize=summarize, topics=topics, sentiment=sentiment, + entities=entities, intents=intents, + custom_topics=custom_topics, custom_intents=custom_intents, + keywords=keywords, keyterm=keyterm, search=search, + redact=redact, profanity_filter=profanity_filter, replace=replace, + filler_words=filler_words, multichannel=multichannel, + encoding=encoding, sample_rate=sample_rate, + ) + + +@mcp.tool(description="Transcribe audio from a public URL." + _TRANSCRIBE_PARAMS_DOC) +async def transcribe_url( + url: str, + model: str = "nova-3", + language: str | None = None, + detect_language: bool | None = None, + smart_format: bool = True, + punctuate: bool | None = None, + paragraphs: bool | None = None, + numerals: bool | None = None, + measurements: bool | None = None, + dictation: bool | None = None, + diarize: bool = True, + utterances: bool | None = None, + utt_split: float | None = None, + summarize: bool | None = None, + topics: bool | None = None, + sentiment: bool | None = None, + entities: bool | None = None, + intents: bool | None = None, + custom_topics: str | None = None, + custom_intents: str | None = None, + keywords: str | None = None, + keyterm: str | None = None, + search: str | None = None, + redact: str | None = None, + profanity_filter: bool | None = None, + replace: str | None = None, + filler_words: bool | None = None, + multichannel: bool | None = None, + encoding: str | None = None, + sample_rate: int | None = None, +) -> str: + """Transcribe audio from a publicly accessible URL.""" + if not url.startswith(("http://", "https://")): + return "Error: URL must start with http:// or https://" + return await _do_transcribe( + url, + model=model, language=language, detect_language=detect_language, + smart_format=smart_format, punctuate=punctuate, paragraphs=paragraphs, + numerals=numerals, measurements=measurements, dictation=dictation, + diarize=diarize, utterances=utterances, utt_split=utt_split, + summarize=summarize, topics=topics, sentiment=sentiment, + entities=entities, intents=intents, + custom_topics=custom_topics, custom_intents=custom_intents, + keywords=keywords, keyterm=keyterm, search=search, + redact=redact, profanity_filter=profanity_filter, replace=replace, + filler_words=filler_words, multichannel=multichannel, + encoding=encoding, sample_rate=sample_rate, + ) + + +@mcp.tool(description="Transcribe a previously uploaded audio file." + _TRANSCRIBE_PARAMS_DOC) +async def transcribe_uploaded( + filename: str, + model: str = "nova-3", + language: str | None = None, + detect_language: bool | None = None, + smart_format: bool = True, + punctuate: bool | None = None, + paragraphs: bool | None = None, + numerals: bool | None = None, + measurements: bool | None = None, + dictation: bool | None = None, + diarize: bool = True, + utterances: bool | None = None, + utt_split: float | None = None, + summarize: bool | None = None, + topics: bool | None = None, + sentiment: bool | None = None, + entities: bool | None = None, + intents: bool | None = None, + custom_topics: str | None = None, + custom_intents: str | None = None, + keywords: str | None = None, + keyterm: str | None = None, + search: str | None = None, + redact: str | None = None, + profanity_filter: bool | None = None, + replace: str | None = None, + filler_words: bool | None = None, + multichannel: bool | None = None, + encoding: str | None = None, + sample_rate: int | None = None, +) -> str: + """Transcribe a file that was uploaded via the /upload endpoint.""" + file_path = file_manager.get_file_path(file_manager.UPLOAD_DIR, filename) + if file_path is None: + return f"Error: Uploaded file not found: {filename}" + return await _do_transcribe( + file_path, + model=model, language=language, detect_language=detect_language, + smart_format=smart_format, punctuate=punctuate, paragraphs=paragraphs, + numerals=numerals, measurements=measurements, dictation=dictation, + diarize=diarize, utterances=utterances, utt_split=utt_split, + summarize=summarize, topics=topics, sentiment=sentiment, + entities=entities, intents=intents, + custom_topics=custom_topics, custom_intents=custom_intents, + keywords=keywords, keyterm=keyterm, search=search, + redact=redact, profanity_filter=profanity_filter, replace=replace, + filler_words=filler_words, multichannel=multichannel, + encoding=encoding, sample_rate=sample_rate, + ) + + +# --------------------------------------------------------------------------- +# TTS tools +# --------------------------------------------------------------------------- + + +@mcp.tool(description="Convert text to speech using Deepgram Aura-2 voices. Returns download URL for the generated audio file.") +async def text_to_speech( + text: str, + model: str = "aura-2-asteria-en", + encoding: str = "mp3", + sample_rate: int = 24000, + container: str | None = None, +) -> str: + """Generate speech audio from text.""" + audio_bytes, filename = await tts.text_to_speech( + text, model=model, encoding=encoding, + sample_rate=sample_rate, container=container, + ) + save_path = file_manager.TTS_DIR / filename + async with aiofiles.open(save_path, "wb") as f: + await f.write(audio_bytes) + size_mb = round(len(audio_bytes) / (1024 * 1024), 2) + host = os.getenv("HOST", "0.0.0.0") + port = os.getenv("PORT", "8009") + download_url = f"http://192.168.1.3:{port}/files/{filename}" + return ( + f"Audio generated successfully.\n" + f"- **File:** {filename}\n" + f"- **Size:** {size_mb} MB\n" + f"- **Model:** {model}\n" + f"- **Encoding:** {encoding}\n" + f"- **Download:** {download_url}" + ) + + +@mcp.tool(description="List available Deepgram Aura-2 TTS voices. Optionally filter by language code (en, es, de, fr, nl, it, ja).") +async def list_tts_voices(language: str | None = None) -> str: + """List available TTS voices.""" + voices = tts.list_voices(language) + if not voices: + return f"No voices found for language: {language}" + lines = [f"## Available TTS Voices ({len(voices)} total)\n"] + current_lang = None + for v in voices: + if v["language"] != current_lang: + current_lang = v["language"] + lines.append(f"\n### {current_lang.upper()}") + gender_icon = "F" if v["gender"] == "female" else "M" + lines.append(f"- `{v['id']}` — {v['name']} ({gender_icon}) — {v['description']}") + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# File management tools +# --------------------------------------------------------------------------- + + +@mcp.tool(description="List files in the upload directory.") +async def list_uploaded_files() -> str: + """List all uploaded audio files.""" + files = file_manager.list_files(file_manager.UPLOAD_DIR) + if not files: + return "No uploaded files found." + lines = ["## Uploaded Files\n"] + lines.append("| File | Size (MB) | Modified |") + lines.append("|------|-----------|----------|") + for f in files: + lines.append(f"| {f['name']} | {f['size_mb']} | {f['modified']} |") + return "\n".join(lines) + + +@mcp.tool(description="List generated TTS audio files.") +async def list_generated_files() -> str: + """List all generated TTS output files.""" + files = file_manager.list_files(file_manager.TTS_DIR) + if not files: + return "No generated files found." + port = os.getenv("PORT", "8009") + lines = ["## Generated Files\n"] + lines.append("| File | Size (MB) | Download URL |") + lines.append("|------|-----------|-------------|") + for f in files: + url = f"http://192.168.1.3:{port}/files/{f['name']}" + lines.append(f"| {f['name']} | {f['size_mb']} | {url} |") + return "\n".join(lines) + + +@mcp.tool(description="Get upload endpoint URL and example curl command for uploading audio files.") +async def get_upload_info() -> str: + """Return upload endpoint info and usage example.""" + port = os.getenv("PORT", "8009") + return ( + f"## File Upload\n\n" + f"**Endpoint:** `POST http://192.168.1.3:{port}/upload`\n\n" + f"**Example:**\n```bash\n" + f"curl -X POST http://192.168.1.3:{port}/upload -F \"file=@recording.m4a\"\n" + f"```\n\n" + f"Then use `transcribe_uploaded(filename=\"...\")` with the returned filename." + ) + + +@mcp.tool(description="Delete an uploaded or generated file. file_type: 'upload' or 'generated'.") +async def delete_file(filename: str, file_type: str = "upload") -> str: + """Delete a file from uploads or generated directory.""" + directory = file_manager.UPLOAD_DIR if file_type == "upload" else file_manager.TTS_DIR + success = file_manager.delete_file(directory, filename) + if success: + return f"Deleted: {filename}" + return f"File not found or could not be deleted: {filename}" + + +# --------------------------------------------------------------------------- +# Utility tools +# --------------------------------------------------------------------------- + + +@mcp.tool(description="Convert audio format or sample rate using ffmpeg. Useful for preprocessing before transcription.") +async def convert_audio( + input_path: str, + output_format: str = "wav", + sample_rate: int | None = None, +) -> str: + """Convert audio file to a different format or sample rate.""" + src = Path(input_path) + if not src.is_file(): + return f"Error: Input file not found: {input_path}" + + stem = src.stem + dest = file_manager.UPLOAD_DIR / f"{stem}_converted.{output_format}" + + cmd = ["ffmpeg", "-i", str(src), "-y"] + if sample_rate: + cmd.extend(["-ar", str(sample_rate)]) + cmd.append(str(dest)) + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _, stderr = await proc.communicate() + + if proc.returncode != 0: + return f"Conversion failed: {stderr.decode().strip()}" + + size_mb = round(dest.stat().st_size / (1024 * 1024), 2) + return ( + f"Converted successfully.\n" + f"- **Output:** {dest}\n" + f"- **Format:** {output_format}\n" + f"- **Size:** {size_mb} MB" + ) + + +@mcp.tool(description="Verify Deepgram API key and check account/project info.") +async def check_api_status() -> str: + """Check if the Deepgram API key is valid.""" + status = await transcription.check_api_status() + if status["valid"]: + projects = status.get("projects", []) + lines = ["## Deepgram API Status: Valid\n"] + if projects: + lines.append("### Projects") + for p in projects: + lines.append(f"- **{p['name']}** (`{p['id']}`)") + return "\n".join(lines) + return f"## Deepgram API Status: Invalid\n\nError: {status.get('error', 'Unknown')}" + + +# --------------------------------------------------------------------------- +# Custom HTTP endpoints (FastMCP custom_route) +# --------------------------------------------------------------------------- + + +@mcp.custom_route("/health", methods=["GET"]) +async def health_endpoint(request: Request) -> Response: + """Health check endpoint for Docker.""" + return JSONResponse({"status": "ok", "service": "deepgram-mcp"}) + + +@mcp.custom_route("/upload", methods=["POST"]) +async def upload_endpoint(request: Request) -> Response: + """Multipart file upload — streams to disk.""" + content_type = request.headers.get("content-type", "") + if "multipart/form-data" not in content_type: + return JSONResponse( + {"error": "Content-Type must be multipart/form-data"}, + status_code=400, + ) + + form = await request.form() + upload = form.get("file") + if upload is None: + return JSONResponse({"error": "No 'file' field in form data"}, status_code=400) + + content = await upload.read() + result = await file_manager.save_upload(upload.filename or "upload", content) + return JSONResponse(result) + + +@mcp.custom_route("/files/{name:path}", methods=["GET"]) +async def files_endpoint(request: Request) -> Response: + """Serve generated TTS files for download.""" + name = request.path_params["name"] + file_path = file_manager.get_file_path(file_manager.TTS_DIR, name) + if file_path is None: + return JSONResponse({"error": "File not found"}, status_code=404) + return FileResponse(str(file_path), filename=name) + + +# --------------------------------------------------------------------------- +# Run server +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + host = os.getenv("HOST", "0.0.0.0") + port = int(os.getenv("PORT", "8009")) + mcp.run( + transport="http", + host=host, + port=port, + ) diff --git a/deepgram-mcp/src/deepgram_mcp/splitter.py b/deepgram-mcp/src/deepgram_mcp/splitter.py new file mode 100644 index 0000000..542c37d --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/splitter.py @@ -0,0 +1,230 @@ +"""FFmpeg-based audio splitting for files exceeding the Deepgram size limit.""" + +import asyncio +import json +import shutil +import tempfile +from pathlib import Path + + +async def get_audio_duration(file_path: Path) -> float: + """Get audio duration in seconds using ffprobe.""" + proc = await asyncio.create_subprocess_exec( + "ffprobe", + "-v", "quiet", + "-print_format", "json", + "-show_format", + str(file_path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + + if proc.returncode != 0: + raise RuntimeError( + f"ffprobe failed (exit {proc.returncode}): {stderr.decode().strip()}" + ) + + info = json.loads(stdout) + return float(info["format"]["duration"]) + + +def get_file_size_mb(file_path: Path) -> float: + """Return the file size in megabytes.""" + return file_path.stat().st_size / (1024 * 1024) + + +async def split_audio( + file_path: Path, + max_chunk_mb: int = 1500, +) -> list[Path]: + """Split an audio file into chunks of approximately max_chunk_mb each. + + Uses ffmpeg's segment muxer with stream copy (no re-encoding). + If the file is already under the limit, returns [file_path] unchanged. + """ + size_mb = get_file_size_mb(file_path) + if size_mb <= max_chunk_mb: + return [file_path] + + duration = await get_audio_duration(file_path) + if duration <= 0: + raise ValueError(f"Invalid audio duration: {duration}s") + + # Calculate segment time so each chunk is ~max_chunk_mb + segment_time = int(duration * max_chunk_mb / size_mb) + if segment_time < 1: + segment_time = 1 + + tmp_dir = Path(tempfile.mkdtemp(prefix="deepgram_chunks_")) + ext = file_path.suffix or ".wav" + pattern = str(tmp_dir / f"chunk_%03d{ext}") + + proc = await asyncio.create_subprocess_exec( + "ffmpeg", + "-i", str(file_path), + "-f", "segment", + "-segment_time", str(segment_time), + "-c", "copy", + "-v", "warning", + pattern, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + _, stderr = await proc.communicate() + + if proc.returncode != 0: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise RuntimeError( + f"ffmpeg split failed (exit {proc.returncode}): {stderr.decode().strip()}" + ) + + chunks = sorted(tmp_dir.glob(f"chunk_*{ext}")) + if not chunks: + shutil.rmtree(tmp_dir, ignore_errors=True) + raise RuntimeError("ffmpeg produced no output chunks") + + return chunks + + +def merge_transcription_results( + results: list[dict], + chunk_durations: list[float], +) -> dict: + """Merge multiple Deepgram transcription responses into a single result. + + Adjusts all timestamps by cumulative offset so chunks stitch together + correctly in the final timeline. + """ + if not results: + return {} + if len(results) == 1: + return results[0] + + # Compute cumulative time offsets for each chunk + offsets = [0.0] + for dur in chunk_durations[:-1]: + offsets.append(offsets[-1] + dur) + + merged_transcript_parts: list[str] = [] + merged_words: list[dict] = [] + merged_paragraphs: list[dict] = [] + merged_utterances: list[dict] = [] + merged_topics: list[dict] = [] + merged_entities: list[dict] = [] + merged_summaries: list[dict] = [] + merged_sentiments: list[dict] = [] + + # Keep metadata from the first result as the base + base = results[0].copy() + + for idx, result in enumerate(results): + offset = offsets[idx] + + # Extract channel transcript data + channels = ( + result.get("results", {}).get("channels", []) + ) + if channels: + alt = channels[0].get("alternatives", [{}])[0] + transcript = alt.get("transcript", "") + if transcript: + merged_transcript_parts.append(transcript) + + for word in alt.get("words", []): + adjusted = word.copy() + adjusted["start"] = round(word.get("start", 0) + offset, 3) + adjusted["end"] = round(word.get("end", 0) + offset, 3) + merged_words.append(adjusted) + + for para in alt.get("paragraphs", {}).get("paragraphs", []): + adjusted = para.copy() + adjusted["start"] = round(para.get("start", 0) + offset, 3) + adjusted["end"] = round(para.get("end", 0) + offset, 3) + if "sentences" in adjusted: + adjusted["sentences"] = [ + { + **s, + "start": round(s.get("start", 0) + offset, 3), + "end": round(s.get("end", 0) + offset, 3), + } + for s in adjusted["sentences"] + ] + merged_paragraphs.append(adjusted) + + # Utterances (diarization) + for utt in result.get("results", {}).get("utterances", []): + adjusted = utt.copy() + adjusted["start"] = round(utt.get("start", 0) + offset, 3) + adjusted["end"] = round(utt.get("end", 0) + offset, 3) + if "words" in adjusted: + adjusted["words"] = [ + { + **w, + "start": round(w.get("start", 0) + offset, 3), + "end": round(w.get("end", 0) + offset, 3), + } + for w in adjusted["words"] + ] + merged_utterances.append(adjusted) + + # Topics, entities, summaries, sentiments -- concatenate lists + res = result.get("results", {}) + merged_topics.extend(res.get("topics", {}).get("segments", [])) + merged_entities.extend(res.get("entities", {}).get("segments", [])) + merged_summaries.extend( + res.get("summary", {}).get("results", []) + or res.get("summaries", []) + ) + merged_sentiments.extend( + res.get("sentiments", {}).get("segments", []) + ) + + # Assemble merged output + if "results" not in base: + base["results"] = {} + + merged_results = base["results"] + + # Rebuild channels + if merged_results.get("channels"): + channel = merged_results["channels"][0] + alt = channel.get("alternatives", [{}])[0] + alt["transcript"] = " ".join(merged_transcript_parts) + alt["words"] = merged_words + if merged_paragraphs: + alt["paragraphs"] = {"paragraphs": merged_paragraphs} + channel["alternatives"] = [alt] + merged_results["channels"] = [channel] + + if merged_utterances: + merged_results["utterances"] = merged_utterances + if merged_topics: + merged_results.setdefault("topics", {})["segments"] = merged_topics + if merged_entities: + merged_results.setdefault("entities", {})["segments"] = merged_entities + if merged_summaries: + merged_results["summaries"] = merged_summaries + if merged_sentiments: + merged_results.setdefault("sentiments", {})["segments"] = merged_sentiments + + return base + + +def cleanup_chunks(chunk_paths: list[Path]) -> None: + """Delete temporary chunk files and their parent directory if it's a temp dir.""" + if not chunk_paths: + return + + parent = chunk_paths[0].parent + + for path in chunk_paths: + try: + if path.is_file(): + path.unlink() + except OSError: + pass + + # Remove the temp directory if it's empty and looks like our temp dir + if parent.name.startswith("deepgram_chunks_"): + shutil.rmtree(parent, ignore_errors=True) diff --git a/deepgram-mcp/src/deepgram_mcp/transcription.py b/deepgram-mcp/src/deepgram_mcp/transcription.py new file mode 100644 index 0000000..7e8ebc1 --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/transcription.py @@ -0,0 +1,211 @@ +"""Speech-to-text transcription via Deepgram REST API (httpx).""" + +import os +from pathlib import Path +from typing import Union + +import httpx + +DEEPGRAM_API_URL = "https://api.deepgram.com/v1/listen" + +MIME_TYPES: dict[str, str] = { + ".mp3": "audio/mpeg", + ".wav": "audio/wav", + ".m4a": "audio/mp4", + ".flac": "audio/flac", + ".ogg": "audio/ogg", + ".webm": "audio/webm", + ".wma": "audio/x-ms-wma", + ".aac": "audio/aac", + ".mp4": "video/mp4", +} + +MAX_FILE_SIZE_MB = 2000 + + +def _get_api_key() -> str: + key = os.getenv("DEEPGRAM_API_KEY", "") + if not key: + raise ValueError("DEEPGRAM_API_KEY environment variable is not set") + return key + + +def _get_mime_type(file_path: Path) -> str: + return MIME_TYPES.get(file_path.suffix.lower(), "application/octet-stream") + + +def build_query_params(params: dict) -> dict: + """Build Deepgram API query parameters from tool kwargs. + + Filters None values, maps comma-separated strings to repeated params, + and converts booleans to lowercase strings. + """ + filtered = {k: v for k, v in params.items() if v is not None} + query: dict = {} + + # Direct fields (string/number/bool) + direct_fields = [ + "model", "version", "language", "detect_language", + "smart_format", "punctuate", "paragraphs", "numerals", + "measurements", "dictation", + "diarize", "utterances", "utt_split", + "summarize", "topics", "sentiment", "entities", "intents", + "profanity_filter", "filler_words", + "multichannel", + "encoding", "sample_rate", + "keyterm", + ] + for field in direct_fields: + if field in filtered: + val = filtered[field] + if isinstance(val, bool): + query[field] = str(val).lower() + else: + query[field] = val + + # Default diarize to true + if "diarize" not in query: + query["diarize"] = "true" + + # Comma-separated -> repeated query params + csv_fields = [ + "custom_topics", "custom_intents", "search", + "redact", "replace", "keywords", + ] + for field in csv_fields: + if field in filtered: + val = filtered[field] + if isinstance(val, str): + items = [s.strip() for s in val.split(",") if s.strip()] + elif isinstance(val, list): + items = val + else: + continue + if items: + query[field] = items + + return query + + +async def transcribe( + source: Union[str, Path, bytes], + options: dict, +) -> dict: + """Transcribe audio from a URL, file path, or raw bytes. + + Returns the full Deepgram transcription response as a dict. + """ + api_key = _get_api_key() + query_params = build_query_params(options) + headers = {"Authorization": f"Token {api_key}"} + + # URL source + if isinstance(source, str) and source.startswith(("http://", "https://")): + headers["Content-Type"] = "application/json" + async with httpx.AsyncClient(timeout=600.0) as client: + resp = await client.post( + DEEPGRAM_API_URL, + params=query_params, + headers=headers, + json={"url": source}, + ) + resp.raise_for_status() + return resp.json() + + # File path source + if isinstance(source, (str, Path)): + file_path = Path(source) + if not file_path.is_file(): + raise FileNotFoundError(f"Audio file not found: {file_path}") + + file_size_mb = file_path.stat().st_size / (1024 * 1024) + + # Large file handling via chunked splitting + if file_size_mb > MAX_FILE_SIZE_MB: + return await _transcribe_large_file(file_path, query_params, headers) + + data = file_path.read_bytes() + mime_type = _get_mime_type(file_path) + headers["Content-Type"] = mime_type + async with httpx.AsyncClient(timeout=600.0) as client: + resp = await client.post( + DEEPGRAM_API_URL, + params=query_params, + headers=headers, + content=data, + ) + resp.raise_for_status() + return resp.json() + + # Raw bytes source + if isinstance(source, bytes): + headers["Content-Type"] = "application/octet-stream" + async with httpx.AsyncClient(timeout=600.0) as client: + resp = await client.post( + DEEPGRAM_API_URL, + params=query_params, + headers=headers, + content=source, + ) + resp.raise_for_status() + return resp.json() + + raise TypeError(f"Unsupported source type: {type(source)}") + + +async def _transcribe_large_file( + file_path: Path, query_params: dict, headers: dict +) -> dict: + """Split a large file into chunks, transcribe each, and merge results.""" + from . import splitter + + chunks = await splitter.split_audio(file_path) + try: + api_key = _get_api_key() + results = [] + chunk_durations = [] + for chunk in chunks: + data = chunk.read_bytes() + mime_type = _get_mime_type(chunk) + chunk_headers = { + **headers, + "Content-Type": mime_type, + } + async with httpx.AsyncClient(timeout=600.0) as client: + resp = await client.post( + DEEPGRAM_API_URL, + params=query_params, + headers=chunk_headers, + content=data, + ) + resp.raise_for_status() + result = resp.json() + results.append(result) + duration = (result.get("metadata") or {}).get("duration", 0.0) + chunk_durations.append(duration) + return splitter.merge_transcription_results(results, chunk_durations) + finally: + splitter.cleanup_chunks(chunks) + + +async def check_api_status() -> dict: + """Verify the Deepgram API key by listing projects. + + Returns dict with 'valid' (bool), 'projects' (list), and 'error' (str|None). + """ + try: + api_key = _get_api_key() + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.get( + "https://api.deepgram.com/v1/projects", + headers={"Authorization": f"Token {api_key}"}, + ) + resp.raise_for_status() + data = resp.json() + projects = [ + {"id": p.get("project_id", ""), "name": p.get("name", "")} + for p in data.get("projects", []) + ] + return {"valid": True, "projects": projects, "error": None} + except Exception as exc: + return {"valid": False, "projects": [], "error": str(exc)} diff --git a/deepgram-mcp/src/deepgram_mcp/tts.py b/deepgram-mcp/src/deepgram_mcp/tts.py new file mode 100644 index 0000000..df7778d --- /dev/null +++ b/deepgram-mcp/src/deepgram_mcp/tts.py @@ -0,0 +1,197 @@ +"""Deepgram Text-to-Speech wrapper using Aura-2 voices (httpx REST API).""" + +from __future__ import annotations + +import os +import time + +import httpx + +DEEPGRAM_TTS_URL = "https://api.deepgram.com/v1/speak" + +ENCODING_TO_EXT: dict[str, str] = { + "mp3": "mp3", + "linear16": "wav", + "wav": "wav", + "flac": "flac", + "opus": "opus", + "aac": "aac", + "mulaw": "wav", +} + +# Real Deepgram Aura-2 voice IDs (format: aura-2-{name}-{lang}) +VOICES: list[dict[str, str]] = [ + # English (US) - Feminine + {"id": "aura-2-asteria-en", "name": "Asteria", "language": "en", "locale": "en-US", "gender": "female", "description": "Warm professional"}, + {"id": "aura-2-luna-en", "name": "Luna", "language": "en", "locale": "en-US", "gender": "female", "description": "Soft gentle"}, + {"id": "aura-2-athena-en", "name": "Athena", "language": "en", "locale": "en-US", "gender": "female", "description": "Authoritative"}, + {"id": "aura-2-aurora-en", "name": "Aurora", "language": "en", "locale": "en-US", "gender": "female", "description": "Bright energetic"}, + {"id": "aura-2-thalia-en", "name": "Thalia", "language": "en", "locale": "en-US", "gender": "female", "description": "Natural conversational"}, + {"id": "aura-2-andromeda-en", "name": "Andromeda", "language": "en", "locale": "en-US", "gender": "female", "description": "Clear articulate"}, + {"id": "aura-2-helena-en", "name": "Helena", "language": "en", "locale": "en-US", "gender": "female", "description": "Elegant polished"}, + {"id": "aura-2-callista-en", "name": "Callista", "language": "en", "locale": "en-US", "gender": "female", "description": "Friendly upbeat"}, + {"id": "aura-2-cora-en", "name": "Cora", "language": "en", "locale": "en-US", "gender": "female", "description": "Calm soothing"}, + {"id": "aura-2-electra-en", "name": "Electra", "language": "en", "locale": "en-US", "gender": "female", "description": "Dynamic expressive"}, + {"id": "aura-2-iris-en", "name": "Iris", "language": "en", "locale": "en-US", "gender": "female", "description": "Bright cheerful"}, + {"id": "aura-2-juno-en", "name": "Juno", "language": "en", "locale": "en-US", "gender": "female", "description": "Confident mature"}, + {"id": "aura-2-minerva-en", "name": "Minerva", "language": "en", "locale": "en-US", "gender": "female", "description": "Wise scholarly"}, + {"id": "aura-2-ophelia-en", "name": "Ophelia", "language": "en", "locale": "en-US", "gender": "female", "description": "Dramatic expressive"}, + {"id": "aura-2-phoebe-en", "name": "Phoebe", "language": "en", "locale": "en-US", "gender": "female", "description": "Youthful fresh"}, + {"id": "aura-2-selene-en", "name": "Selene", "language": "en", "locale": "en-US", "gender": "female", "description": "Serene ethereal"}, + {"id": "aura-2-vesta-en", "name": "Vesta", "language": "en", "locale": "en-US", "gender": "female", "description": "Warm nurturing"}, + {"id": "aura-2-cordelia-en", "name": "Cordelia", "language": "en", "locale": "en-US", "gender": "female", "description": "Regal composed"}, + {"id": "aura-2-delia-en", "name": "Delia", "language": "en", "locale": "en-US", "gender": "female", "description": "Light melodic"}, + {"id": "aura-2-harmonia-en", "name": "Harmonia", "language": "en", "locale": "en-US", "gender": "female", "description": "Balanced harmonious"}, + {"id": "aura-2-amalthea-en", "name": "Amalthea", "language": "en", "locale": "en-US", "gender": "female", "description": "Gentle nurturing"}, + {"id": "aura-2-janus-en", "name": "Janus", "language": "en", "locale": "en-US", "gender": "female", "description": "Versatile adaptive"}, + # English (US) - Masculine + {"id": "aura-2-orion-en", "name": "Orion", "language": "en", "locale": "en-US", "gender": "male", "description": "Deep resonant"}, + {"id": "aura-2-arcas-en", "name": "Arcas", "language": "en", "locale": "en-US", "gender": "male", "description": "Youthful energetic"}, + {"id": "aura-2-orpheus-en", "name": "Orpheus", "language": "en", "locale": "en-US", "gender": "male", "description": "Expressive poetic"}, + {"id": "aura-2-zeus-en", "name": "Zeus", "language": "en", "locale": "en-US", "gender": "male", "description": "Commanding powerful"}, + {"id": "aura-2-apollo-en", "name": "Apollo", "language": "en", "locale": "en-US", "gender": "male", "description": "Bright confident"}, + {"id": "aura-2-atlas-en", "name": "Atlas", "language": "en", "locale": "en-US", "gender": "male", "description": "Strong steady"}, + {"id": "aura-2-hermes-en", "name": "Hermes", "language": "en", "locale": "en-US", "gender": "male", "description": "Quick articulate"}, + {"id": "aura-2-jupiter-en", "name": "Jupiter", "language": "en", "locale": "en-US", "gender": "male", "description": "Authoritative warm"}, + {"id": "aura-2-mars-en", "name": "Mars", "language": "en", "locale": "en-US", "gender": "male", "description": "Bold assertive"}, + {"id": "aura-2-neptune-en", "name": "Neptune", "language": "en", "locale": "en-US", "gender": "male", "description": "Calm deep"}, + {"id": "aura-2-odysseus-en", "name": "Odysseus", "language": "en", "locale": "en-US", "gender": "male", "description": "Storyteller adventurous"}, + {"id": "aura-2-pluto-en", "name": "Pluto", "language": "en", "locale": "en-US", "gender": "male", "description": "Dark mysterious"}, + {"id": "aura-2-saturn-en", "name": "Saturn", "language": "en", "locale": "en-US", "gender": "male", "description": "Mature wise"}, + {"id": "aura-2-aries-en", "name": "Aries", "language": "en", "locale": "en-US", "gender": "male", "description": "Energetic dynamic"}, + # English (GB) + {"id": "aura-2-pandora-en", "name": "Pandora", "language": "en", "locale": "en-GB", "gender": "female", "description": "British female"}, + {"id": "aura-2-draco-en", "name": "Draco", "language": "en", "locale": "en-GB", "gender": "male", "description": "British male"}, + # English (AU) + {"id": "aura-2-theia-en", "name": "Theia", "language": "en", "locale": "en-AU", "gender": "female", "description": "Australian female"}, + {"id": "aura-2-hyperion-en", "name": "Hyperion", "language": "en", "locale": "en-AU", "gender": "male", "description": "Australian male"}, + # Spanish - Mexican + {"id": "aura-2-estrella-es", "name": "Estrella", "language": "es", "locale": "es-MX", "gender": "female", "description": "Mexican female"}, + {"id": "aura-2-olivia-es", "name": "Olivia", "language": "es", "locale": "es-MX", "gender": "female", "description": "Mexican female warm"}, + {"id": "aura-2-sirio-es", "name": "Sirio", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male"}, + {"id": "aura-2-javier-es", "name": "Javier", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male warm"}, + {"id": "aura-2-luciano-es", "name": "Luciano", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male expressive"}, + {"id": "aura-2-valerio-es", "name": "Valerio", "language": "es", "locale": "es-MX", "gender": "male", "description": "Mexican male confident"}, + # Spanish - Peninsular + {"id": "aura-2-carina-es", "name": "Carina", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female"}, + {"id": "aura-2-diana-es", "name": "Diana", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female elegant"}, + {"id": "aura-2-agustina-es", "name": "Agustina", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female classic"}, + {"id": "aura-2-silvia-es", "name": "Silvia", "language": "es", "locale": "es-ES", "gender": "female", "description": "Castilian female bright"}, + {"id": "aura-2-nestor-es", "name": "Nestor", "language": "es", "locale": "es-ES", "gender": "male", "description": "Castilian male"}, + {"id": "aura-2-alvaro-es", "name": "Alvaro", "language": "es", "locale": "es-ES", "gender": "male", "description": "Castilian male confident"}, + # Spanish - Colombian / Argentine / LatAm + {"id": "aura-2-celeste-es", "name": "Celeste", "language": "es", "locale": "es-CO", "gender": "female", "description": "Colombian female"}, + {"id": "aura-2-gloria-es", "name": "Gloria", "language": "es", "locale": "es-CO", "gender": "female", "description": "Colombian female warm"}, + {"id": "aura-2-antonia-es", "name": "Antonia", "language": "es", "locale": "es-AR", "gender": "female", "description": "Argentine female"}, + {"id": "aura-2-aquila-es", "name": "Aquila", "language": "es", "locale": "es-419", "gender": "male", "description": "Latin American male"}, + {"id": "aura-2-selena-es", "name": "Selena", "language": "es", "locale": "es-419", "gender": "female", "description": "Latin American female"}, + # German + {"id": "aura-2-elara-de", "name": "Elara", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female natural"}, + {"id": "aura-2-aurelia-de", "name": "Aurelia", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female elegant"}, + {"id": "aura-2-lara-de", "name": "Lara", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female youthful"}, + {"id": "aura-2-kara-de", "name": "Kara", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female confident"}, + {"id": "aura-2-viktoria-de", "name": "Viktoria", "language": "de", "locale": "de-DE", "gender": "female", "description": "German female strong"}, + {"id": "aura-2-julius-de", "name": "Julius", "language": "de", "locale": "de-DE", "gender": "male", "description": "German male professional"}, + {"id": "aura-2-fabian-de", "name": "Fabian", "language": "de", "locale": "de-DE", "gender": "male", "description": "German male warm"}, + # French + {"id": "aura-2-agathe-fr", "name": "Agathe", "language": "fr", "locale": "fr-FR", "gender": "female", "description": "French female"}, + {"id": "aura-2-hector-fr", "name": "Hector", "language": "fr", "locale": "fr-FR", "gender": "male", "description": "French male"}, + # Dutch + {"id": "aura-2-beatrix-nl", "name": "Beatrix", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female classic"}, + {"id": "aura-2-daphne-nl", "name": "Daphne", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female natural"}, + {"id": "aura-2-cornelia-nl", "name": "Cornelia", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female warm"}, + {"id": "aura-2-hestia-nl", "name": "Hestia", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female gentle"}, + {"id": "aura-2-rhea-nl", "name": "Rhea", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female bright"}, + {"id": "aura-2-leda-nl", "name": "Leda", "language": "nl", "locale": "nl-NL", "gender": "female", "description": "Dutch female elegant"}, + {"id": "aura-2-sander-nl", "name": "Sander", "language": "nl", "locale": "nl-NL", "gender": "male", "description": "Dutch male natural"}, + {"id": "aura-2-lars-nl", "name": "Lars", "language": "nl", "locale": "nl-NL", "gender": "male", "description": "Dutch male confident"}, + {"id": "aura-2-roman-nl", "name": "Roman", "language": "nl", "locale": "nl-NL", "gender": "male", "description": "Dutch male warm"}, + # Italian + {"id": "aura-2-melia-it", "name": "Melia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female natural"}, + {"id": "aura-2-maia-it", "name": "Maia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female warm"}, + {"id": "aura-2-cinzia-it", "name": "Cinzia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female elegant"}, + {"id": "aura-2-livia-it", "name": "Livia", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female classic"}, + {"id": "aura-2-demetra-it", "name": "Demetra", "language": "it", "locale": "it-IT", "gender": "female", "description": "Italian female strong"}, + {"id": "aura-2-elio-it", "name": "Elio", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male bright"}, + {"id": "aura-2-flavio-it", "name": "Flavio", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male warm"}, + {"id": "aura-2-cesare-it", "name": "Cesare", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male authoritative"}, + {"id": "aura-2-perseo-it", "name": "Perseo", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male dynamic"}, + {"id": "aura-2-dionisio-it", "name": "Dionisio", "language": "it", "locale": "it-IT", "gender": "male", "description": "Italian male expressive"}, + # Japanese + {"id": "aura-2-uzume-ja", "name": "Uzume", "language": "ja", "locale": "ja-JP", "gender": "female", "description": "Japanese female natural"}, + {"id": "aura-2-izanami-ja", "name": "Izanami", "language": "ja", "locale": "ja-JP", "gender": "female", "description": "Japanese female elegant"}, + {"id": "aura-2-ebisu-ja", "name": "Ebisu", "language": "ja", "locale": "ja-JP", "gender": "male", "description": "Japanese male warm"}, + {"id": "aura-2-fujin-ja", "name": "Fujin", "language": "ja", "locale": "ja-JP", "gender": "male", "description": "Japanese male dynamic"}, + {"id": "aura-2-ama-ja", "name": "Ama", "language": "ja", "locale": "ja-JP", "gender": "male", "description": "Japanese male natural"}, +] + + +def list_voices(language: str | None = None) -> list[dict[str, str]]: + """Return voices, optionally filtered by language code. + + The filter is case-insensitive and matches both short codes ("en") + and full locale codes ("en-US"). + """ + if language is None: + return sorted(VOICES, key=lambda v: (v["language"], v["name"])) + + lang = language.lower() + filtered = [ + v for v in VOICES + if v["language"].lower() == lang or v["locale"].lower() == lang + ] + return sorted(filtered, key=lambda v: (v["language"], v["name"])) + + +def get_voice_info(voice_id: str) -> dict[str, str] | None: + """Return voice info dict for a given voice ID, or None if not found.""" + for voice in VOICES: + if voice["id"] == voice_id: + return voice + return None + + +async def text_to_speech( + text: str, + model: str = "aura-2-asteria-en", + encoding: str = "mp3", + sample_rate: int = 24000, + container: str | None = None, +) -> tuple[bytes, str]: + """Convert text to speech using Deepgram Aura-2 REST API. + + Returns a tuple of (audio_bytes, suggested_filename). + """ + api_key = os.environ.get("DEEPGRAM_API_KEY", "") + if not api_key: + raise ValueError("DEEPGRAM_API_KEY environment variable is not set") + + params: dict = { + "model": model, + "encoding": encoding, + "sample_rate": str(sample_rate), + } + if container is not None: + params["container"] = container + + headers = { + "Authorization": f"Token {api_key}", + "Content-Type": "application/json", + } + + async with httpx.AsyncClient(timeout=120.0) as client: + resp = await client.post( + DEEPGRAM_TTS_URL, + params=params, + headers=headers, + json={"text": text}, + ) + resp.raise_for_status() + audio_bytes = resp.content + + ext = ENCODING_TO_EXT.get(encoding, encoding) + model_short = model.rsplit("-", 1)[-1] + timestamp = int(time.time()) + filename = f"tts_{timestamp}_{model_short}.{ext}" + + return audio_bytes, filename