Files
nuc/deepgram-mcp/src/deepgram_mcp/splitter.py
Alejandro Gutiérrez 0ba2896565 Add Deepgram MCP Server - speech-to-text and TTS
Python FastMCP server wrapping Deepgram API for audio transcription
and text-to-speech. Supports 125+ multilingual voices, large file
chunking via FFmpeg, formatted markdown output with speaker
diarization, and Docker deployment on port 8009.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-18 15:17:52 +01:00

231 lines
7.5 KiB
Python

"""FFmpeg-based audio splitting for files exceeding the Deepgram size limit."""
import asyncio
import json
import shutil
import tempfile
from pathlib import Path
async def get_audio_duration(file_path: Path) -> float:
"""Get audio duration in seconds using ffprobe."""
proc = await asyncio.create_subprocess_exec(
"ffprobe",
"-v", "quiet",
"-print_format", "json",
"-show_format",
str(file_path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
raise RuntimeError(
f"ffprobe failed (exit {proc.returncode}): {stderr.decode().strip()}"
)
info = json.loads(stdout)
return float(info["format"]["duration"])
def get_file_size_mb(file_path: Path) -> float:
"""Return the file size in megabytes."""
return file_path.stat().st_size / (1024 * 1024)
async def split_audio(
file_path: Path,
max_chunk_mb: int = 1500,
) -> list[Path]:
"""Split an audio file into chunks of approximately max_chunk_mb each.
Uses ffmpeg's segment muxer with stream copy (no re-encoding).
If the file is already under the limit, returns [file_path] unchanged.
"""
size_mb = get_file_size_mb(file_path)
if size_mb <= max_chunk_mb:
return [file_path]
duration = await get_audio_duration(file_path)
if duration <= 0:
raise ValueError(f"Invalid audio duration: {duration}s")
# Calculate segment time so each chunk is ~max_chunk_mb
segment_time = int(duration * max_chunk_mb / size_mb)
if segment_time < 1:
segment_time = 1
tmp_dir = Path(tempfile.mkdtemp(prefix="deepgram_chunks_"))
ext = file_path.suffix or ".wav"
pattern = str(tmp_dir / f"chunk_%03d{ext}")
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
"-i", str(file_path),
"-f", "segment",
"-segment_time", str(segment_time),
"-c", "copy",
"-v", "warning",
pattern,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError(
f"ffmpeg split failed (exit {proc.returncode}): {stderr.decode().strip()}"
)
chunks = sorted(tmp_dir.glob(f"chunk_*{ext}"))
if not chunks:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError("ffmpeg produced no output chunks")
return chunks
def merge_transcription_results(
results: list[dict],
chunk_durations: list[float],
) -> dict:
"""Merge multiple Deepgram transcription responses into a single result.
Adjusts all timestamps by cumulative offset so chunks stitch together
correctly in the final timeline.
"""
if not results:
return {}
if len(results) == 1:
return results[0]
# Compute cumulative time offsets for each chunk
offsets = [0.0]
for dur in chunk_durations[:-1]:
offsets.append(offsets[-1] + dur)
merged_transcript_parts: list[str] = []
merged_words: list[dict] = []
merged_paragraphs: list[dict] = []
merged_utterances: list[dict] = []
merged_topics: list[dict] = []
merged_entities: list[dict] = []
merged_summaries: list[dict] = []
merged_sentiments: list[dict] = []
# Keep metadata from the first result as the base
base = results[0].copy()
for idx, result in enumerate(results):
offset = offsets[idx]
# Extract channel transcript data
channels = (
result.get("results", {}).get("channels", [])
)
if channels:
alt = channels[0].get("alternatives", [{}])[0]
transcript = alt.get("transcript", "")
if transcript:
merged_transcript_parts.append(transcript)
for word in alt.get("words", []):
adjusted = word.copy()
adjusted["start"] = round(word.get("start", 0) + offset, 3)
adjusted["end"] = round(word.get("end", 0) + offset, 3)
merged_words.append(adjusted)
for para in alt.get("paragraphs", {}).get("paragraphs", []):
adjusted = para.copy()
adjusted["start"] = round(para.get("start", 0) + offset, 3)
adjusted["end"] = round(para.get("end", 0) + offset, 3)
if "sentences" in adjusted:
adjusted["sentences"] = [
{
**s,
"start": round(s.get("start", 0) + offset, 3),
"end": round(s.get("end", 0) + offset, 3),
}
for s in adjusted["sentences"]
]
merged_paragraphs.append(adjusted)
# Utterances (diarization)
for utt in result.get("results", {}).get("utterances", []):
adjusted = utt.copy()
adjusted["start"] = round(utt.get("start", 0) + offset, 3)
adjusted["end"] = round(utt.get("end", 0) + offset, 3)
if "words" in adjusted:
adjusted["words"] = [
{
**w,
"start": round(w.get("start", 0) + offset, 3),
"end": round(w.get("end", 0) + offset, 3),
}
for w in adjusted["words"]
]
merged_utterances.append(adjusted)
# Topics, entities, summaries, sentiments -- concatenate lists
res = result.get("results", {})
merged_topics.extend(res.get("topics", {}).get("segments", []))
merged_entities.extend(res.get("entities", {}).get("segments", []))
merged_summaries.extend(
res.get("summary", {}).get("results", [])
or res.get("summaries", [])
)
merged_sentiments.extend(
res.get("sentiments", {}).get("segments", [])
)
# Assemble merged output
if "results" not in base:
base["results"] = {}
merged_results = base["results"]
# Rebuild channels
if merged_results.get("channels"):
channel = merged_results["channels"][0]
alt = channel.get("alternatives", [{}])[0]
alt["transcript"] = " ".join(merged_transcript_parts)
alt["words"] = merged_words
if merged_paragraphs:
alt["paragraphs"] = {"paragraphs": merged_paragraphs}
channel["alternatives"] = [alt]
merged_results["channels"] = [channel]
if merged_utterances:
merged_results["utterances"] = merged_utterances
if merged_topics:
merged_results.setdefault("topics", {})["segments"] = merged_topics
if merged_entities:
merged_results.setdefault("entities", {})["segments"] = merged_entities
if merged_summaries:
merged_results["summaries"] = merged_summaries
if merged_sentiments:
merged_results.setdefault("sentiments", {})["segments"] = merged_sentiments
return base
def cleanup_chunks(chunk_paths: list[Path]) -> None:
"""Delete temporary chunk files and their parent directory if it's a temp dir."""
if not chunk_paths:
return
parent = chunk_paths[0].parent
for path in chunk_paths:
try:
if path.is_file():
path.unlink()
except OSError:
pass
# Remove the temp directory if it's empty and looks like our temp dir
if parent.name.startswith("deepgram_chunks_"):
shutil.rmtree(parent, ignore_errors=True)