summaryrefslogtreecommitdiff
path: root/overnight_transcoder.py
diff options
context:
space:
mode:
Diffstat (limited to 'overnight_transcoder.py')
-rwxr-xr-xovernight_transcoder.py1245
1 files changed, 1245 insertions, 0 deletions
diff --git a/overnight_transcoder.py b/overnight_transcoder.py
new file mode 100755
index 0000000..8a6d7d0
--- /dev/null
+++ b/overnight_transcoder.py
@@ -0,0 +1,1245 @@
+#!/usr/bin/env python3
+"""
+Overnight Video Transcoder
+Transcodes large video files to HEVC to save space.
+Runs via cron between 02:00-07:00.
+
+Usage:
+ overnight_transcoder.py # Normal run
+ overnight_transcoder.py --dry # Dry run - show what would be transcoded
+ overnight_transcoder.py --stats # Show statistics
+ overnight_transcoder.py --verbose # Extra debug output
+"""
+
+import os
+import sys
+import json
+import subprocess
+import shutil
+import logging
+import argparse
+import sqlite3
+import traceback
+import re
+import signal
+from pathlib import Path
+from datetime import datetime, time
+from typing import Optional, List, Dict, Tuple
+from contextlib import contextmanager
+
+# Parse arguments
+parser = argparse.ArgumentParser(description="Overnight video transcoder")
+parser.add_argument("--dry", action="store_true", help="Dry run - don't transcode, just show what would happen")
+parser.add_argument("--stats", action="store_true", help="Show cache statistics and exit")
+parser.add_argument("--failed", action="store_true", help="Show failed transcodes and exit")
+parser.add_argument("--retry-failed", action="store_true", help="Reset failed files to pending for retry")
+parser.add_argument("--clear-cache", action="store_true", help="Clear the cache and exit")
+parser.add_argument("--verbose", "-v", action="store_true", help="Verbose debug output")
+parser.add_argument("--json-log", action="store_true", help="Output structured JSON logs (one per line)")
+args = parser.parse_args()
+
+DRY_RUN = args.dry
+VERBOSE = args.verbose
+JSON_LOG = args.json_log
+
+# Track current transcode for signal handling
+CURRENT_TRANSCODE = {"path": None, "pid": None, "start_time": None}
+
+# Configuration
+CONFIG = {
+ "source_dirs": [
+ "/disks/Plex/Anime",
+ "/disks/Plex/Films",
+ "/disks/Plex/TV",
+ ],
+ "cleanup_dir": "/disks/Plex/.cleanup",
+ "log_dir": "/var/log",
+ "log_file": "/var/log/transcoder.log",
+ "db_file": "/var/lib/transcoder/cache.db",
+ "video_extensions": [".mkv", ".mp4", ".avi", ".m4v", ".mov", ".wmv"],
+ "cutoff_time": time(6, 30), # Don't start new encodes after 06:30
+ "cleanup_days": 7, # Delete originals after 7 days
+
+ # FFmpeg settings
+ "crf": 20,
+ "preset": "slow",
+ "audio_codec": "ac3",
+ "audio_bitrate": "640k",
+ "threads": 12,
+}
+
+
+def init_db() -> sqlite3.Connection:
+ """Initialize the SQLite database for caching file info."""
+ db_path = Path(CONFIG["db_file"])
+ db_path.parent.mkdir(parents=True, exist_ok=True)
+
+ conn = sqlite3.connect(CONFIG["db_file"])
+
+ # Main files table - tracks all video files
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS files (
+ path TEXT PRIMARY KEY,
+ filename TEXT,
+ directory TEXT,
+ original_size INTEGER,
+ mtime REAL,
+ video_codec TEXT,
+ audio_codec TEXT,
+ is_hevc INTEGER,
+ first_seen REAL,
+ scanned_at REAL,
+
+ -- Transcode info (NULL if not transcoded)
+ status TEXT DEFAULT 'pending', -- pending, transcoding, done, failed, stalled
+ transcoded_at REAL,
+ transcoded_size INTEGER,
+ transcode_duration_secs REAL,
+ transcode_settings TEXT, -- JSON: {crf, preset, audio_codec, audio_bitrate}
+ failure_reason TEXT,
+
+ -- Computed (updated after transcode)
+ space_saved INTEGER,
+ compression_ratio REAL
+ )
+ """)
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_is_hevc ON files(is_hevc)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_status ON files(status)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_size ON files(original_size)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_directory ON files(directory)")
+
+ # Stats summary table - running totals
+ conn.execute("""
+ CREATE TABLE IF NOT EXISTS stats (
+ id INTEGER PRIMARY KEY CHECK (id = 1), -- Single row
+ total_files_transcoded INTEGER DEFAULT 0,
+ total_space_saved INTEGER DEFAULT 0,
+ total_transcode_time_secs REAL DEFAULT 0,
+ last_updated REAL
+ )
+ """)
+ conn.execute("INSERT OR IGNORE INTO stats (id) VALUES (1)")
+
+ conn.commit()
+ return conn
+
+
+def get_audio_codec(filepath: str) -> Optional[str]:
+ """Get the audio codec of a file using ffprobe."""
+ try:
+ result = subprocess.run([
+ "ffprobe", "-v", "quiet",
+ "-select_streams", "a:0",
+ "-show_entries", "stream=codec_name",
+ "-of", "json",
+ filepath
+ ], capture_output=True, text=True, timeout=30)
+
+ if result.returncode == 0:
+ data = json.loads(result.stdout)
+ streams = data.get("streams", [])
+ if streams:
+ return streams[0].get("codec_name", "").lower()
+ except Exception as e:
+ log.warning(f"Failed to probe audio {filepath}: {e}")
+ return None
+
+
+def get_cached_file(conn: sqlite3.Connection, filepath: str, size: int, mtime: float) -> Optional[dict]:
+ """Get cached file info if file hasn't changed. Returns dict or None."""
+ cursor = conn.execute(
+ """SELECT video_codec, audio_codec, is_hevc, status
+ FROM files WHERE path = ? AND original_size = ? AND mtime = ?""",
+ (filepath, size, mtime)
+ )
+ row = cursor.fetchone()
+ if row:
+ return {
+ "video_codec": row[0],
+ "audio_codec": row[1],
+ "is_hevc": bool(row[2]),
+ "status": row[3]
+ }
+ return None
+
+
+def cache_file(conn: sqlite3.Connection, filepath: str, size: int, mtime: float,
+ video_codec: str, audio_codec: str, is_hevc: bool):
+ """Cache file info."""
+ p = Path(filepath)
+ now = datetime.now().timestamp()
+
+ # Check if file already exists (to preserve first_seen)
+ cursor = conn.execute("SELECT first_seen FROM files WHERE path = ?", (filepath,))
+ row = cursor.fetchone()
+ first_seen = row[0] if row else now
+
+ conn.execute("""
+ INSERT OR REPLACE INTO files
+ (path, filename, directory, original_size, mtime, video_codec, audio_codec,
+ is_hevc, first_seen, scanned_at, status)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
+ COALESCE((SELECT status FROM files WHERE path = ?), 'pending'))
+ """, (filepath, p.name, str(p.parent), size, mtime, video_codec, audio_codec,
+ int(is_hevc), first_seen, now, filepath))
+ conn.commit()
+
+
+def update_transcode_result(conn: sqlite3.Connection, filepath: str, new_path: str,
+ new_size: int, duration_secs: float, settings: dict,
+ success: bool, failure_reason: str = None):
+ """Update database after transcode attempt."""
+ now = datetime.now().timestamp()
+
+ if success:
+ cursor = conn.execute("SELECT original_size FROM files WHERE path = ?", (filepath,))
+ row = cursor.fetchone()
+ original_size = row[0] if row else 0
+
+ space_saved = original_size - new_size
+ compression_ratio = new_size / original_size if original_size > 0 else 0
+
+ conn.execute("""
+ UPDATE files SET
+ status = 'done',
+ transcoded_at = ?,
+ transcoded_size = ?,
+ transcode_duration_secs = ?,
+ transcode_settings = ?,
+ space_saved = ?,
+ compression_ratio = ?,
+ failure_reason = NULL
+ WHERE path = ?
+ """, (now, new_size, duration_secs, json.dumps(settings),
+ space_saved, compression_ratio, filepath))
+
+ # Update running totals
+ conn.execute("""
+ UPDATE stats SET
+ total_files_transcoded = total_files_transcoded + 1,
+ total_space_saved = total_space_saved + ?,
+ total_transcode_time_secs = total_transcode_time_secs + ?,
+ last_updated = ?
+ WHERE id = 1
+ """, (space_saved, duration_secs, now))
+ else:
+ conn.execute("""
+ UPDATE files SET
+ status = 'failed',
+ failure_reason = ?
+ WHERE path = ?
+ """, (failure_reason, filepath))
+
+ conn.commit()
+
+
+def remove_from_cache(conn: sqlite3.Connection, filepath: str):
+ """Remove a file from the cache."""
+ conn.execute("DELETE FROM files WHERE path = ?", (filepath,))
+ conn.commit()
+
+
+def get_cache_stats(conn: sqlite3.Connection) -> dict:
+ """Get comprehensive stats from the cache."""
+ # File counts and sizes
+ cursor = conn.execute("""
+ SELECT
+ COUNT(*) as total,
+ SUM(CASE WHEN is_hevc = 1 THEN 1 ELSE 0 END) as hevc_count,
+ SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN 1 ELSE 0 END) as pending_count,
+ SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) as done_count,
+ SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count,
+ SUM(CASE WHEN is_hevc = 1 THEN original_size ELSE 0 END) as hevc_size,
+ SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN original_size ELSE 0 END) as pending_size
+ FROM files
+ """)
+ row = cursor.fetchone()
+ file_stats = {
+ "total": row[0] or 0,
+ "hevc_count": row[1] or 0,
+ "pending_count": row[2] or 0,
+ "done_count": row[3] or 0,
+ "failed_count": row[4] or 0,
+ "hevc_size": row[5] or 0,
+ "pending_size": row[6] or 0,
+ }
+
+ # Running totals
+ cursor = conn.execute("SELECT * FROM stats WHERE id = 1")
+ row = cursor.fetchone()
+ if row:
+ file_stats["total_transcoded"] = row[1] or 0
+ file_stats["total_space_saved"] = row[2] or 0
+ file_stats["total_transcode_time"] = row[3] or 0
+
+ # By directory breakdown
+ cursor = conn.execute("""
+ SELECT directory,
+ COUNT(*) as count,
+ SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN original_size ELSE 0 END) as pending_size
+ FROM files
+ GROUP BY directory
+ ORDER BY pending_size DESC
+ LIMIT 10
+ """)
+ file_stats["by_directory"] = cursor.fetchall()
+
+ return file_stats
+
+# Ensure log directory exists (skip for /var/log which requires root)
+if not CONFIG["log_dir"].startswith("/var/"):
+ Path(CONFIG["log_dir"]).mkdir(parents=True, exist_ok=True)
+
+# Set up logging
+log_level = logging.DEBUG if VERBOSE else logging.INFO
+logging.basicConfig(
+ level=log_level,
+ format='%(asctime)s [%(levelname)s] %(message)s',
+ handlers=[
+ logging.FileHandler(CONFIG["log_file"]),
+ logging.StreamHandler(sys.stdout)
+ ]
+)
+log = logging.getLogger(__name__)
+
+
+def log_json(event: str, **data):
+ """Write structured JSON log entry (for machine parsing)."""
+ if not JSON_LOG:
+ return
+ entry = {
+ "ts": datetime.now().isoformat(),
+ "event": event,
+ **data
+ }
+ print(json.dumps(entry), file=sys.stderr)
+
+
+def format_bytes(size: int) -> str:
+ """Human-readable byte size."""
+ for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
+ if abs(size) < 1024.0:
+ return f"{size:.2f} {unit}"
+ size /= 1024.0
+ return f"{size:.2f} PB"
+
+
+def format_duration(seconds: float) -> str:
+ """Human-readable duration."""
+ if seconds < 60:
+ return f"{seconds:.1f}s"
+ elif seconds < 3600:
+ return f"{seconds/60:.1f}m"
+ else:
+ return f"{seconds/3600:.1f}h"
+
+
+def get_disk_space(path: str) -> dict:
+ """Get disk space info for a path."""
+ try:
+ stat = os.statvfs(path)
+ total = stat.f_blocks * stat.f_frsize
+ free = stat.f_bavail * stat.f_frsize
+ used = total - free
+ return {
+ "total": total,
+ "free": free,
+ "used": used,
+ "percent_used": (used / total * 100) if total > 0 else 0
+ }
+ except Exception as e:
+ log.warning(f"Failed to get disk space for {path}: {e}")
+ return None
+
+
+def check_disk_space():
+ """Log disk space for all source directories."""
+ log.info("=" * 50)
+ log.info("Disk Space Check")
+ log.info("=" * 50)
+
+ checked_mounts = set()
+ for source_dir in CONFIG["source_dirs"]:
+ # Get mount point to avoid duplicate checks
+ try:
+ mount = subprocess.run(
+ ["df", "--output=target", source_dir],
+ capture_output=True, text=True
+ ).stdout.strip().split('\n')[-1]
+ except:
+ mount = source_dir
+
+ if mount in checked_mounts:
+ continue
+ checked_mounts.add(mount)
+
+ space = get_disk_space(source_dir)
+ if space:
+ log.info(f" {mount}:")
+ log.info(f" Total: {format_bytes(space['total'])}")
+ log.info(f" Free: {format_bytes(space['free'])} ({100-space['percent_used']:.1f}% available)")
+ log_json("disk_space", mount=mount, **space)
+
+ if space['percent_used'] > 90:
+ log.warning(f" ⚠️ LOW DISK SPACE on {mount}!")
+ log.info("")
+
+
+def get_video_duration(filepath: str) -> Optional[float]:
+ """Get video duration in seconds using ffprobe."""
+ try:
+ result = subprocess.run([
+ "ffprobe", "-v", "quiet",
+ "-show_entries", "format=duration",
+ "-of", "json",
+ filepath
+ ], capture_output=True, text=True, timeout=30)
+
+ if result.returncode == 0:
+ data = json.loads(result.stdout)
+ return float(data.get("format", {}).get("duration", 0))
+ except Exception as e:
+ log.debug(f"Failed to get duration for {filepath}: {e}")
+ return None
+
+
+def handle_interrupt(signum, frame):
+ """Handle SIGINT/SIGTERM gracefully."""
+ log.warning("=" * 50)
+ log.warning("INTERRUPT RECEIVED - cleaning up...")
+ log.warning("=" * 50)
+
+ if CURRENT_TRANSCODE["path"]:
+ log.warning(f"Interrupted transcode: {CURRENT_TRANSCODE['path']}")
+ if CURRENT_TRANSCODE["start_time"]:
+ elapsed = (datetime.now() - CURRENT_TRANSCODE["start_time"]).total_seconds()
+ log.warning(f"Elapsed time: {format_duration(elapsed)}")
+
+ # Try to kill ffmpeg if running
+ if CURRENT_TRANSCODE["pid"]:
+ try:
+ os.kill(CURRENT_TRANSCODE["pid"], signal.SIGTERM)
+ log.info("Sent SIGTERM to ffmpeg process")
+ except:
+ pass
+
+ # Clean up partial output
+ temp_path = Path(CURRENT_TRANSCODE["path"]).with_suffix(".transcoding.mkv")
+ if temp_path.exists():
+ log.info(f"Removing partial output: {temp_path}")
+ temp_path.unlink()
+
+ log_json("interrupted",
+ file=CURRENT_TRANSCODE["path"],
+ elapsed=elapsed if CURRENT_TRANSCODE["start_time"] else None)
+ sys.exit(1)
+
+
+# Register signal handlers
+signal.signal(signal.SIGINT, handle_interrupt)
+signal.signal(signal.SIGTERM, handle_interrupt)
+
+
+def get_video_codec(filepath: str) -> Optional[str]:
+ """Get the video codec of a file using ffprobe."""
+ try:
+ result = subprocess.run([
+ "ffprobe", "-v", "quiet",
+ "-select_streams", "v:0",
+ "-show_entries", "stream=codec_name",
+ "-of", "json",
+ filepath
+ ], capture_output=True, text=True, timeout=30)
+
+ if result.returncode == 0:
+ data = json.loads(result.stdout)
+ streams = data.get("streams", [])
+ if streams:
+ return streams[0].get("codec_name", "").lower()
+ except Exception as e:
+ log.warning(f"Failed to probe {filepath}: {e}")
+ return None
+
+
+def is_hevc(filepath: str) -> bool:
+ """Check if a file is already HEVC encoded."""
+ codec = get_video_codec(filepath)
+ return codec in ["hevc", "h265"]
+
+
+def get_file_size_gb(filepath: str) -> float:
+ """Get file size in GB."""
+ return os.path.getsize(filepath) / (1024 ** 3)
+
+
+def find_videos_to_transcode(conn: sqlite3.Connection) -> List[Dict]:
+ """Find all non-HEVC videos, sorted by size (biggest first). Uses cache for speed."""
+ videos = []
+ cache_hits = 0
+ cache_misses = 0
+ files_scanned = 0
+ dirs_scanned = 0
+
+ scan_start = datetime.now()
+ log.info("Scanning source directories...")
+
+ for source_dir in CONFIG["source_dirs"]:
+ if not os.path.exists(source_dir):
+ log.warning(f" ⚠ Source directory not found: {source_dir}")
+ continue
+
+ dir_start = datetime.now()
+ dir_files = 0
+ dir_size = 0
+ log.info(f" 📁 Scanning: {source_dir}")
+
+ for root, dirs, files in os.walk(source_dir):
+ # Skip cleanup directory
+ if ".cleanup" in root:
+ continue
+
+ dirs_scanned += 1
+
+ for filename in files:
+ ext = os.path.splitext(filename)[1].lower()
+ if ext not in CONFIG["video_extensions"]:
+ continue
+
+ filepath = os.path.join(root, filename)
+ files_scanned += 1
+ dir_files += 1
+
+ try:
+ stat = os.stat(filepath)
+ size = stat.st_size
+ mtime = stat.st_mtime
+ dir_size += size
+
+ # Check cache first
+ cached = get_cached_file(conn, filepath, size, mtime)
+ if cached:
+ video_codec = cached["video_codec"]
+ audio_codec = cached["audio_codec"]
+ is_hevc = cached["is_hevc"]
+ status = cached["status"]
+ cache_hits += 1
+ else:
+ # Need to probe this file
+ log.debug(f" Probing: {filename}")
+ video_codec = get_video_codec(filepath)
+ audio_codec = get_audio_codec(filepath)
+ is_hevc = video_codec in ["hevc", "h265"] if video_codec else False
+ status = "done" if is_hevc else "pending"
+ cache_file(conn, filepath, size, mtime,
+ video_codec or "unknown", audio_codec or "unknown", is_hevc)
+ cache_misses += 1
+
+ videos.append({
+ "path": filepath,
+ "size": size,
+ "size_gb": size / (1024 ** 3),
+ "video_codec": video_codec,
+ "audio_codec": audio_codec,
+ "is_hevc": is_hevc,
+ "status": status
+ })
+ except OSError as e:
+ log.warning(f" ⚠ Cannot access {filepath}: {e}")
+
+ dir_elapsed = (datetime.now() - dir_start).total_seconds()
+ log.info(f" Found {dir_files} videos ({format_bytes(dir_size)}) in {format_duration(dir_elapsed)}")
+ log_json("scan_directory",
+ directory=source_dir,
+ files=dir_files,
+ size=dir_size,
+ elapsed=dir_elapsed)
+
+ scan_elapsed = (datetime.now() - scan_start).total_seconds()
+ cache_rate = (cache_hits / (cache_hits + cache_misses) * 100) if (cache_hits + cache_misses) > 0 else 0
+
+ log.info("")
+ log.info(f"Scan complete in {format_duration(scan_elapsed)}")
+ log.info(f" Files scanned: {files_scanned}")
+ log.info(f" Dirs traversed: {dirs_scanned}")
+ log.info(f" Cache hits: {cache_hits} ({cache_rate:.1f}%)")
+ log.info(f" Cache misses: {cache_misses} (probed)")
+ log.info("")
+
+ log_json("scan_complete",
+ files=files_scanned,
+ dirs=dirs_scanned,
+ cache_hits=cache_hits,
+ cache_misses=cache_misses,
+ elapsed=scan_elapsed)
+
+ # Sort by size descending (biggest first)
+ videos.sort(key=lambda x: x["size"], reverse=True)
+ return videos
+
+
+def is_past_cutoff() -> bool:
+ """Check if we're past the cutoff time for starting new encodes."""
+ now = datetime.now().time()
+ return now >= CONFIG["cutoff_time"]
+
+
+def verify_output(filepath: str, min_size_ratio: float = 0.1) -> bool:
+ """Verify the transcoded file is valid."""
+ if not os.path.exists(filepath):
+ return False
+
+ # Check file size is reasonable (at least 10% of some minimum)
+ size = os.path.getsize(filepath)
+ if size < 1024 * 1024: # Less than 1MB is definitely wrong
+ return False
+
+ # Check it's valid video with ffprobe
+ codec = get_video_codec(filepath)
+ if codec != "hevc":
+ return False
+
+ return True
+
+
+def transcode_file(input_path: str, conn: sqlite3.Connection) -> Optional[dict]:
+ """Transcode a single file to HEVC. Returns result dict on success."""
+
+ input_file = Path(input_path)
+ output_path = input_file.with_suffix(".transcoding.mkv")
+ final_path = input_file.with_suffix(".mkv")
+
+ original_size = os.path.getsize(input_path)
+ video_duration = get_video_duration(input_path)
+
+ log.info("")
+ log.info("=" * 60)
+ log.info(f"TRANSCODE START: {input_file.name}")
+ log.info("=" * 60)
+ log.info(f" Input path: {input_path}")
+ log.info(f" Output path: {output_path}")
+ log.info(f" Original size: {format_bytes(original_size)}")
+ if video_duration:
+ log.info(f" Duration: {format_duration(video_duration)}")
+
+ log_json("transcode_start",
+ file=input_path,
+ size=original_size,
+ duration=video_duration)
+
+ # Mark as transcoding in DB
+ conn.execute("UPDATE files SET status = 'transcoding' WHERE path = ?", (input_path,))
+ conn.commit()
+
+ start_time = datetime.now()
+ CURRENT_TRANSCODE["path"] = input_path
+ CURRENT_TRANSCODE["start_time"] = start_time
+
+ settings = {
+ "crf": CONFIG["crf"],
+ "preset": CONFIG["preset"],
+ "audio_codec": CONFIG["audio_codec"],
+ "audio_bitrate": CONFIG["audio_bitrate"],
+ "threads": CONFIG["threads"]
+ }
+
+ # Build ffmpeg command
+ cmd = [
+ "ffmpeg", "-y",
+ "-threads", str(CONFIG["threads"]),
+ "-progress", "pipe:1", # Progress to stdout for parsing
+ "-i", input_path,
+ "-c:v", "libx265",
+ "-preset", CONFIG["preset"],
+ "-crf", str(CONFIG["crf"]),
+ "-x265-params", f"pools={CONFIG['threads']}",
+ "-c:a", CONFIG["audio_codec"],
+ "-b:a", CONFIG["audio_bitrate"],
+ "-c:s", "copy", # Copy subtitles
+ str(output_path)
+ ]
+
+ # Log the full command for debugging
+ cmd_str = ' '.join(f'"{c}"' if ' ' in c else c for c in cmd)
+ log.info(f" FFmpeg command:")
+ log.debug(f" {cmd_str}")
+ if not VERBOSE:
+ log.info(f" (use --verbose to see full command)")
+ log.info("")
+ log_json("ffmpeg_command", command=cmd_str)
+
+ # Log stderr to file to prevent pipe buffer deadlock
+ stderr_log_path = Path(CONFIG["log_dir"]) / "transcoder_ffmpeg_stderr.log"
+ stderr_file = open(stderr_log_path, "a")
+ stderr_file.write(f"\n{'='*60}\n{datetime.now()} - {input_path}\n{'='*60}\n")
+ stderr_file.flush()
+
+ try:
+ # Use Popen for progress monitoring
+ # stderr goes to file (not PIPE) to prevent buffer deadlock
+ process = subprocess.Popen(
+ cmd,
+ stdout=subprocess.PIPE,
+ stderr=stderr_file,
+ text=True
+ )
+ CURRENT_TRANSCODE["pid"] = process.pid
+ log.debug(f" FFmpeg PID: {process.pid}")
+
+ # Monitor progress
+ last_progress_log = datetime.now()
+ progress_interval = 60 # Log progress every 60 seconds
+ current_time_us = 0
+
+ import select
+ last_output_size = 0
+ last_output_growth = datetime.now()
+ stall_timeout = 600 # 10 minutes with no output growth = stalled
+
+ while True:
+ # Use select for non-blocking read with timeout
+ ready, _, _ = select.select([process.stdout], [], [], 30)
+
+ if ready:
+ line = process.stdout.readline()
+ if not line and process.poll() is not None:
+ break
+
+ # Parse progress output
+ if line.startswith("out_time_us="):
+ try:
+ current_time_us = int(line.split("=")[1])
+ except:
+ pass
+ elif process.poll() is not None:
+ break
+
+ # Log progress periodically
+ now = datetime.now()
+ if (now - last_progress_log).total_seconds() >= progress_interval:
+ elapsed = (now - start_time).total_seconds()
+
+ if video_duration and current_time_us > 0:
+ current_secs = current_time_us / 1_000_000
+ percent = (current_secs / video_duration) * 100
+ eta_secs = (elapsed / percent * 100) - elapsed if percent > 0 else 0
+
+ log.info(f" ⏳ Progress: {percent:.1f}% | Elapsed: {format_duration(elapsed)} | ETA: {format_duration(eta_secs)}")
+ log_json("transcode_progress",
+ file=input_path,
+ percent=percent,
+ elapsed=elapsed,
+ eta=eta_secs)
+ else:
+ log.info(f" ⏳ Elapsed: {format_duration(elapsed)}")
+
+ # Check output size growth and detect stalls
+ if output_path.exists():
+ current_size = os.path.getsize(output_path)
+ log.info(f" Output size: {format_bytes(current_size)}")
+
+ if current_size > last_output_size:
+ last_output_size = current_size
+ last_output_growth = now
+ elif (now - last_output_growth).total_seconds() > stall_timeout:
+ log.error(f" 🛑 STALL DETECTED: Output file hasn't grown in {stall_timeout}s")
+ log.error(f" Output stuck at {format_bytes(current_size)}")
+ log.error(f" Killing ffmpeg (PID {process.pid})")
+ process.kill()
+ process.wait(timeout=30)
+
+ # Mark as stalled in DB with timestamp for 7-day cooldown
+ conn.execute(
+ "UPDATE files SET status = 'stalled', failure_reason = ?, transcoded_at = ? WHERE path = ?",
+ (f"Output stalled at {format_bytes(current_size)} after {format_duration(elapsed)}",
+ datetime.now().timestamp(), input_path)
+ )
+ conn.commit()
+
+ # Clean up partial output
+ if output_path.exists():
+ output_path.unlink()
+ log.info(f" Cleaned up partial file: {output_path}")
+
+ stderr_file.close()
+ return None
+
+ last_progress_log = now
+
+ # Wait for process to finish (stderr already going to file)
+ process.wait(timeout=60)
+ stderr_file.close()
+
+ if process.returncode != 0:
+ # Read last 1000 chars of stderr log for error reporting
+ try:
+ with open(stderr_log_path, 'r') as f:
+ f.seek(max(0, os.path.getsize(stderr_log_path) - 1000))
+ error_msg = f.read()
+ except:
+ error_msg = "Unknown error (check transcoder_ffmpeg_stderr.log)"
+ log.error("=" * 60)
+ log.error("TRANSCODE FAILED")
+ log.error("=" * 60)
+ log.error(f" File: {input_path}")
+ log.error(f" Exit code: {process.returncode}")
+ log.error(f" FFmpeg error output:")
+ for line in error_msg.split('\n')[-20:]: # Last 20 lines
+ if line.strip():
+ log.error(f" {line}")
+ log.error("=" * 60)
+
+ log_json("transcode_failed",
+ file=input_path,
+ exit_code=process.returncode,
+ error=error_msg[-500:])
+
+ if output_path.exists():
+ output_path.unlink()
+ update_transcode_result(conn, input_path, None, 0, 0, settings, False, error_msg[-500:])
+ CURRENT_TRANSCODE["path"] = None
+ return None
+
+ except subprocess.TimeoutExpired:
+ log.error("=" * 60)
+ log.error(f"TRANSCODE TIMEOUT (8 hours): {input_path}")
+ log.error("=" * 60)
+ log_json("transcode_timeout", file=input_path)
+ process.kill()
+ if output_path.exists():
+ output_path.unlink()
+ update_transcode_result(conn, input_path, None, 0, 0, settings, False, "Timeout (8 hours)")
+ CURRENT_TRANSCODE["path"] = None
+ return None
+ except Exception as e:
+ log.error("=" * 60)
+ log.error(f"TRANSCODE ERROR: {input_path}")
+ log.error("=" * 60)
+ log.error(f" Exception: {type(e).__name__}: {e}")
+ log.error(f" Traceback:")
+ for line in traceback.format_exc().split('\n'):
+ if line.strip():
+ log.error(f" {line}")
+ log.error("=" * 60)
+
+ log_json("transcode_error",
+ file=input_path,
+ error=str(e),
+ traceback=traceback.format_exc())
+
+ if output_path.exists():
+ output_path.unlink()
+ update_transcode_result(conn, input_path, None, 0, 0, settings, False, str(e))
+ CURRENT_TRANSCODE["path"] = None
+ return None
+
+ # Verify output
+ log.info(" Verifying output file...")
+ if not verify_output(str(output_path)):
+ log.error("=" * 60)
+ log.error("OUTPUT VERIFICATION FAILED")
+ log.error("=" * 60)
+ log.error(f" File: {output_path}")
+ if output_path.exists():
+ log.error(f" Output size: {format_bytes(os.path.getsize(output_path))}")
+ output_codec = get_video_codec(str(output_path))
+ log.error(f" Output codec: {output_codec}")
+ else:
+ log.error(" Output file does not exist!")
+ log.error("=" * 60)
+
+ log_json("transcode_verify_failed", file=input_path)
+ if output_path.exists():
+ output_path.unlink()
+ update_transcode_result(conn, input_path, None, 0, 0, settings, False, "Output verification failed")
+ CURRENT_TRANSCODE["path"] = None
+ return None
+
+ new_size = os.path.getsize(output_path)
+ duration_secs = (datetime.now() - start_time).total_seconds()
+ space_saved = original_size - new_size
+ compression_ratio = new_size / original_size if original_size > 0 else 0
+ encode_speed = video_duration / duration_secs if video_duration and duration_secs > 0 else 0
+
+ log.info("")
+ log.info("=" * 60)
+ log.info("✅ TRANSCODE COMPLETE")
+ log.info("=" * 60)
+ log.info(f" File: {input_file.name}")
+ log.info(f" Duration: {format_duration(duration_secs)}")
+ log.info(f" Original: {format_bytes(original_size)}")
+ log.info(f" New size: {format_bytes(new_size)}")
+ log.info(f" Saved: {format_bytes(space_saved)} ({100*(1-compression_ratio):.1f}% reduction)")
+ log.info(f" Ratio: {compression_ratio:.2f}x")
+ if encode_speed > 0:
+ log.info(f" Speed: {encode_speed:.2f}x realtime")
+ log.info("=" * 60)
+ log.info("")
+
+ log_json("transcode_complete",
+ file=input_path,
+ original_size=original_size,
+ new_size=new_size,
+ space_saved=space_saved,
+ duration_secs=duration_secs,
+ compression_ratio=compression_ratio,
+ encode_speed=encode_speed)
+
+ # Move original to cleanup directory
+ cleanup_dir = Path(CONFIG["cleanup_dir"])
+ cleanup_dir.mkdir(parents=True, exist_ok=True)
+
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ cleanup_name = f"{timestamp}_{input_file.name}"
+ cleanup_path = cleanup_dir / cleanup_name
+
+ log.info(f" Moving original to cleanup: {cleanup_path}")
+ try:
+ shutil.move(input_path, cleanup_path)
+ log.info(f" ✓ Original moved successfully")
+ log_json("original_moved", source=input_path, dest=str(cleanup_path))
+ except Exception as e:
+ log.error(f" ✗ Failed to move original: {e}")
+ log.error(f" Traceback: {traceback.format_exc()}")
+ # Don't fail the whole operation, but keep both files
+
+ # Rename output to final name
+ log.info(f" Renaming output: {output_path} -> {final_path}")
+ try:
+ output_path.rename(final_path)
+ log.info(f" ✓ Output renamed successfully")
+ except Exception as e:
+ log.error(f" ✗ Failed to rename output: {e}")
+ final_path = output_path
+
+ # Fix permissions - mediaserver group
+ log.debug(" Setting permissions (mediaserver:664)")
+ try:
+ subprocess.run(["chgrp", "mediaserver", str(final_path)], check=False)
+ subprocess.run(["chmod", "664", str(final_path)], check=False)
+ log.debug(" ✓ Permissions set")
+ except Exception as e:
+ log.warning(f" ✗ Failed to set permissions: {e}")
+
+ # Update database with success
+ update_transcode_result(conn, input_path, str(final_path), new_size, duration_secs, settings, True)
+
+ CURRENT_TRANSCODE["path"] = None
+ return {
+ "output_path": str(final_path),
+ "new_size": new_size,
+ "duration_secs": duration_secs,
+ "space_saved": space_saved
+ }
+
+
+def cleanup_old_files():
+ """Delete original files older than cleanup_days."""
+ cleanup_dir = Path(CONFIG["cleanup_dir"])
+ if not cleanup_dir.exists():
+ log.debug("Cleanup directory doesn't exist, skipping cleanup")
+ return
+
+ log.info("Checking cleanup directory for old originals...")
+ cutoff = datetime.now().timestamp() - (CONFIG["cleanup_days"] * 24 * 3600)
+ cutoff_date = datetime.fromtimestamp(cutoff).strftime('%Y-%m-%d %H:%M')
+ log.debug(f" Cutoff date: {cutoff_date} ({CONFIG['cleanup_days']} days ago)")
+
+ deleted_count = 0
+ deleted_size = 0
+ kept_count = 0
+ kept_size = 0
+
+ for filepath in cleanup_dir.iterdir():
+ if filepath.is_file():
+ mtime = filepath.stat().st_mtime
+ size = filepath.stat().st_size
+
+ if mtime < cutoff:
+ try:
+ filepath.unlink()
+ deleted_count += 1
+ deleted_size += size
+ log.info(f" 🗑️ Deleted: {filepath.name} ({format_bytes(size)})")
+ log_json("cleanup_deleted", file=str(filepath), size=size)
+ except Exception as e:
+ log.warning(f" ⚠ Failed to delete {filepath}: {e}")
+ else:
+ kept_count += 1
+ kept_size += size
+ days_old = (datetime.now().timestamp() - mtime) / 86400
+ log.debug(f" ⏳ Keeping: {filepath.name} ({days_old:.1f} days old)")
+
+ if deleted_count > 0:
+ log.info(f" Cleanup: deleted {deleted_count} files, freed {format_bytes(deleted_size)}")
+ log_json("cleanup_complete", deleted=deleted_count, freed=deleted_size)
+ else:
+ log.info(f" Cleanup: nothing to delete ({kept_count} files still in retention)")
+
+ if kept_count > 0:
+ log.debug(f" Retention: {kept_count} files ({format_bytes(kept_size)}) waiting)")
+
+
+def main():
+ # Initialize database
+ conn = init_db()
+
+ # Handle --stats flag
+ if args.stats:
+ stats = get_cache_stats(conn)
+ print("=" * 60)
+ print("Transcoder Statistics")
+ print("=" * 60)
+ print(f"Total files scanned: {stats['total']}")
+ print(f"Already HEVC: {stats['hevc_count']} ({stats['hevc_size']/(1024**3):.2f} GB)")
+ print(f"Pending transcode: {stats['pending_count']} ({stats['pending_size']/(1024**3):.2f} GB)")
+ print(f"Completed: {stats['done_count']}")
+ print(f"Failed: {stats['failed_count']}")
+ print("")
+ print("--- Lifetime Stats ---")
+ print(f"Total transcoded: {stats.get('total_transcoded', 0)} files")
+ print(f"Total space saved: {stats.get('total_space_saved', 0)/(1024**3):.2f} GB")
+ total_time = stats.get('total_transcode_time', 0)
+ print(f"Total transcode time: {total_time/3600:.1f} hours")
+ if stats['pending_count'] > 0:
+ est_savings = stats['pending_size'] * 0.5
+ print(f"\nEstimated remaining: ~{est_savings/(1024**3):.2f} GB savings")
+
+ if stats.get('by_directory'):
+ print("\n--- Top Directories (by pending size) ---")
+ for dir_path, count, pending in stats['by_directory'][:5]:
+ if pending > 0:
+ # Shorten path for display
+ short = dir_path.replace("/disks/Plex/", "")
+ print(f" {short}: {pending/(1024**3):.2f} GB pending")
+
+ print("=" * 60)
+ conn.close()
+ return
+
+ # Handle --clear-cache flag
+ if args.clear_cache:
+ conn.execute("DELETE FROM files")
+ conn.execute("UPDATE stats SET total_files_transcoded=0, total_space_saved=0, total_transcode_time_secs=0")
+ conn.commit()
+ print("Cache cleared.")
+ conn.close()
+ return
+
+ # Handle --failed flag
+ if args.failed:
+ cursor = conn.execute("""
+ SELECT path, original_size, failure_reason
+ FROM files WHERE status = 'failed'
+ ORDER BY original_size DESC
+ """)
+ rows = cursor.fetchall()
+ print("=" * 60)
+ print(f"Failed Transcodes ({len(rows)} files)")
+ print("=" * 60)
+ for path, size, reason in rows:
+ print(f"\n{path}")
+ print(f" Size: {size/(1024**3):.2f} GB")
+ print(f" Reason: {reason}")
+ if not rows:
+ print("No failed transcodes!")
+ print("=" * 60)
+ conn.close()
+ return
+
+ # Handle --retry-failed flag
+ if args.retry_failed:
+ cursor = conn.execute("UPDATE files SET status = 'pending', failure_reason = NULL WHERE status = 'failed'")
+ count = cursor.rowcount
+ conn.commit()
+ print(f"Reset {count} failed files to pending.")
+ conn.close()
+ return
+
+ session_start = datetime.now()
+
+ log.info("")
+ log.info("=" * 70)
+ log.info(" OVERNIGHT VIDEO TRANSCODER")
+ log.info("=" * 70)
+ log.info(f" Started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}")
+ log.info(f" Mode: {'DRY RUN' if DRY_RUN else 'LIVE'}")
+ log.info(f" Verbose: {'YES' if VERBOSE else 'NO'}")
+ log.info(f" JSON log: {'YES' if JSON_LOG else 'NO'}")
+ log.info(f" Cutoff: {CONFIG['cutoff_time']}")
+ log.info(f" Settings: CRF={CONFIG['crf']}, preset={CONFIG['preset']}, audio={CONFIG['audio_codec']}@{CONFIG['audio_bitrate']}")
+ log.info("=" * 70)
+ log.info("")
+
+ log_json("session_start",
+ mode="dry_run" if DRY_RUN else "live",
+ config=CONFIG)
+
+ # Check disk space first
+ check_disk_space()
+
+ # Run cleanup first (skip in dry run)
+ if not DRY_RUN:
+ cleanup_old_files()
+
+ # Find videos to transcode
+ videos = find_videos_to_transcode(conn)
+
+ # Filter to non-HEVC pending only (using cached codec info)
+ to_transcode = []
+ hevc_count = 0
+ hevc_size = 0
+
+ for video in videos:
+ if video.get("is_hevc", False) or video.get("status") == "done":
+ hevc_count += 1
+ hevc_size += video["size"]
+ elif video.get("status") == "pending":
+ to_transcode.append(video)
+
+ log.info(f"Already HEVC: {hevc_count} files ({hevc_size / (1024**3):.2f} GB)")
+ log.info(f"Need transcoding: {len(to_transcode)} files")
+
+ if DRY_RUN:
+ log.info("")
+ log.info("=" * 60)
+ log.info("DRY RUN - Files that would be transcoded (biggest first):")
+ log.info("=" * 60)
+
+ total_size = 0
+ estimated_savings = 0
+
+ for i, video in enumerate(to_transcode[:20], 1): # Show top 20
+ size_gb = video["size_gb"]
+ total_size += video["size"]
+ # Estimate ~50% compression for HEVC
+ est_saved = video["size"] * 0.5
+ estimated_savings += est_saved
+
+ log.info(f"{i:2}. [{size_gb:6.2f} GB] {video['path']}")
+
+ if len(to_transcode) > 20:
+ log.info(f" ... and {len(to_transcode) - 20} more files")
+
+ log.info("")
+ log.info("=" * 60)
+ log.info(f"Total to transcode: {total_size / (1024**3):.2f} GB")
+ log.info(f"Estimated savings (~50%): {estimated_savings / (1024**3):.2f} GB")
+ log.info("=" * 60)
+ return
+
+ # Normal run
+ transcoded_count = 0
+ total_saved = 0
+ failed_count = 0
+ skipped_count = 0
+
+ log.info("")
+ log.info("=" * 60)
+ log.info("Starting transcode queue")
+ log.info("=" * 60)
+ log.info(f" Files to process: {len(to_transcode)}")
+ log.info(f" Total size: {format_bytes(sum(v['size'] for v in to_transcode))}")
+ log.info("")
+
+ for i, video in enumerate(to_transcode, 1):
+ # Check cutoff time
+ if is_past_cutoff():
+ remaining = len(to_transcode) - i + 1
+ log.info("")
+ log.info("=" * 60)
+ log.info(f"⏰ CUTOFF TIME REACHED ({CONFIG['cutoff_time']})")
+ log.info(f" Stopping with {remaining} files remaining in queue")
+ log.info("=" * 60)
+ log_json("cutoff_reached", remaining_files=remaining)
+ break
+
+ # Skip already done or failed
+ if video.get("status") in ["done", "failed"]:
+ log.debug(f" Skipping (status={video['status']}): {video['path']}")
+ skipped_count += 1
+ continue
+
+ # Skip stalled files unless 7 days have passed
+ if video.get("status") == "stalled":
+ row = conn.execute(
+ "SELECT transcoded_at FROM files WHERE path = ?", (video['path'],)
+ ).fetchone()
+ if row and row[0]:
+ stalled_at = datetime.fromtimestamp(row[0])
+ days_since = (datetime.now() - stalled_at).days
+ if days_since < 7:
+ log.debug(f" Skipping stalled file ({days_since}d ago, retry after 7d): {video['path']}")
+ skipped_count += 1
+ continue
+ else:
+ log.info(f" Retrying stalled file ({days_since}d cooldown passed): {video['path']}")
+ else:
+ log.debug(f" Skipping stalled file (no timestamp): {video['path']}")
+ skipped_count += 1
+ continue
+
+ log.info(f"[{i}/{len(to_transcode)}] Queued: {Path(video['path']).name}")
+ log.info(f" Size: {format_bytes(video['size'])}")
+ log.info(f" Codec: video={video.get('video_codec', 'unknown')}, audio={video.get('audio_codec', 'unknown')}")
+ log.info(f" Path: {video['path']}")
+
+ log_json("transcode_queued",
+ index=i,
+ total=len(to_transcode),
+ file=video['path'],
+ size=video['size'],
+ video_codec=video.get('video_codec'),
+ audio_codec=video.get('audio_codec'))
+
+ # Transcode
+ result = transcode_file(video["path"], conn)
+
+ if result:
+ total_saved += result["space_saved"]
+ transcoded_count += 1
+ log.info(f" Running totals: {transcoded_count} done, {format_bytes(total_saved)} saved")
+ else:
+ failed_count += 1
+ log.error(f" ❌ Transcode #{i} failed: {video['path']}")
+ log.info(f" Running totals: {transcoded_count} done, {failed_count} failed")
+
+ session_end = datetime.now()
+ session_duration = (session_end - session_start).total_seconds()
+
+ log.info("")
+ log.info("=" * 70)
+ log.info(" SESSION COMPLETE")
+ log.info("=" * 70)
+ log.info(f" Started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}")
+ log.info(f" Ended: {session_end.strftime('%Y-%m-%d %H:%M:%S')}")
+ log.info(f" Duration: {format_duration(session_duration)}")
+ log.info(f" Transcoded: {transcoded_count} files")
+ log.info(f" Failed: {failed_count} files")
+ log.info(f" Skipped: {skipped_count} files")
+ log.info(f" Space saved: {format_bytes(total_saved)}")
+
+ # Get updated lifetime stats
+ stats = get_cache_stats(conn)
+ log.info("")
+ log.info(" --- Lifetime Totals ---")
+ log.info(f" Total transcoded: {stats.get('total_transcoded', 0)} files")
+ log.info(f" Total saved: {format_bytes(stats.get('total_space_saved', 0))}")
+ log.info(f" Pending: {stats.get('pending_count', 0)} files ({format_bytes(stats.get('pending_size', 0))})")
+ log.info("=" * 70)
+ log.info("")
+
+ log_json("session_complete",
+ started=session_start.isoformat(),
+ ended=session_end.isoformat(),
+ duration_secs=session_duration,
+ transcoded=transcoded_count,
+ space_saved=total_saved,
+ lifetime_transcoded=stats.get('total_transcoded', 0),
+ lifetime_saved=stats.get('total_space_saved', 0),
+ pending_count=stats.get('pending_count', 0))
+
+ conn.close()
+
+
+if __name__ == "__main__":
+ main()