diff options
| author | Caine <susan@tomflux.xyz> | 2026-02-15 09:41:49 +0000 |
|---|---|---|
| committer | Caine <susan@tomflux.xyz> | 2026-02-15 09:41:49 +0000 |
| commit | c7956ae9b228054d57897ea338ad4154cc0b7221 (patch) | |
| tree | a1f517ea452dddf34468c01c85871358e8b7295e /overnight_transcoder.py | |
Overnight transcoding, music discovery/import, system health reports,
stats page generator, and bookmark management.
Secrets stored in /etc/automation/ — not in repo.
Diffstat (limited to 'overnight_transcoder.py')
| -rwxr-xr-x | overnight_transcoder.py | 1245 |
1 files changed, 1245 insertions, 0 deletions
diff --git a/overnight_transcoder.py b/overnight_transcoder.py new file mode 100755 index 0000000..8a6d7d0 --- /dev/null +++ b/overnight_transcoder.py @@ -0,0 +1,1245 @@ +#!/usr/bin/env python3 +""" +Overnight Video Transcoder +Transcodes large video files to HEVC to save space. +Runs via cron between 02:00-07:00. + +Usage: + overnight_transcoder.py # Normal run + overnight_transcoder.py --dry # Dry run - show what would be transcoded + overnight_transcoder.py --stats # Show statistics + overnight_transcoder.py --verbose # Extra debug output +""" + +import os +import sys +import json +import subprocess +import shutil +import logging +import argparse +import sqlite3 +import traceback +import re +import signal +from pathlib import Path +from datetime import datetime, time +from typing import Optional, List, Dict, Tuple +from contextlib import contextmanager + +# Parse arguments +parser = argparse.ArgumentParser(description="Overnight video transcoder") +parser.add_argument("--dry", action="store_true", help="Dry run - don't transcode, just show what would happen") +parser.add_argument("--stats", action="store_true", help="Show cache statistics and exit") +parser.add_argument("--failed", action="store_true", help="Show failed transcodes and exit") +parser.add_argument("--retry-failed", action="store_true", help="Reset failed files to pending for retry") +parser.add_argument("--clear-cache", action="store_true", help="Clear the cache and exit") +parser.add_argument("--verbose", "-v", action="store_true", help="Verbose debug output") +parser.add_argument("--json-log", action="store_true", help="Output structured JSON logs (one per line)") +args = parser.parse_args() + +DRY_RUN = args.dry +VERBOSE = args.verbose +JSON_LOG = args.json_log + +# Track current transcode for signal handling +CURRENT_TRANSCODE = {"path": None, "pid": None, "start_time": None} + +# Configuration +CONFIG = { + "source_dirs": [ + "/disks/Plex/Anime", + "/disks/Plex/Films", + "/disks/Plex/TV", + ], + "cleanup_dir": "/disks/Plex/.cleanup", + "log_dir": "/var/log", + "log_file": "/var/log/transcoder.log", + "db_file": "/var/lib/transcoder/cache.db", + "video_extensions": [".mkv", ".mp4", ".avi", ".m4v", ".mov", ".wmv"], + "cutoff_time": time(6, 30), # Don't start new encodes after 06:30 + "cleanup_days": 7, # Delete originals after 7 days + + # FFmpeg settings + "crf": 20, + "preset": "slow", + "audio_codec": "ac3", + "audio_bitrate": "640k", + "threads": 12, +} + + +def init_db() -> sqlite3.Connection: + """Initialize the SQLite database for caching file info.""" + db_path = Path(CONFIG["db_file"]) + db_path.parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(CONFIG["db_file"]) + + # Main files table - tracks all video files + conn.execute(""" + CREATE TABLE IF NOT EXISTS files ( + path TEXT PRIMARY KEY, + filename TEXT, + directory TEXT, + original_size INTEGER, + mtime REAL, + video_codec TEXT, + audio_codec TEXT, + is_hevc INTEGER, + first_seen REAL, + scanned_at REAL, + + -- Transcode info (NULL if not transcoded) + status TEXT DEFAULT 'pending', -- pending, transcoding, done, failed, stalled + transcoded_at REAL, + transcoded_size INTEGER, + transcode_duration_secs REAL, + transcode_settings TEXT, -- JSON: {crf, preset, audio_codec, audio_bitrate} + failure_reason TEXT, + + -- Computed (updated after transcode) + space_saved INTEGER, + compression_ratio REAL + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_is_hevc ON files(is_hevc)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_status ON files(status)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_size ON files(original_size)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_directory ON files(directory)") + + # Stats summary table - running totals + conn.execute(""" + CREATE TABLE IF NOT EXISTS stats ( + id INTEGER PRIMARY KEY CHECK (id = 1), -- Single row + total_files_transcoded INTEGER DEFAULT 0, + total_space_saved INTEGER DEFAULT 0, + total_transcode_time_secs REAL DEFAULT 0, + last_updated REAL + ) + """) + conn.execute("INSERT OR IGNORE INTO stats (id) VALUES (1)") + + conn.commit() + return conn + + +def get_audio_codec(filepath: str) -> Optional[str]: + """Get the audio codec of a file using ffprobe.""" + try: + result = subprocess.run([ + "ffprobe", "-v", "quiet", + "-select_streams", "a:0", + "-show_entries", "stream=codec_name", + "-of", "json", + filepath + ], capture_output=True, text=True, timeout=30) + + if result.returncode == 0: + data = json.loads(result.stdout) + streams = data.get("streams", []) + if streams: + return streams[0].get("codec_name", "").lower() + except Exception as e: + log.warning(f"Failed to probe audio {filepath}: {e}") + return None + + +def get_cached_file(conn: sqlite3.Connection, filepath: str, size: int, mtime: float) -> Optional[dict]: + """Get cached file info if file hasn't changed. Returns dict or None.""" + cursor = conn.execute( + """SELECT video_codec, audio_codec, is_hevc, status + FROM files WHERE path = ? AND original_size = ? AND mtime = ?""", + (filepath, size, mtime) + ) + row = cursor.fetchone() + if row: + return { + "video_codec": row[0], + "audio_codec": row[1], + "is_hevc": bool(row[2]), + "status": row[3] + } + return None + + +def cache_file(conn: sqlite3.Connection, filepath: str, size: int, mtime: float, + video_codec: str, audio_codec: str, is_hevc: bool): + """Cache file info.""" + p = Path(filepath) + now = datetime.now().timestamp() + + # Check if file already exists (to preserve first_seen) + cursor = conn.execute("SELECT first_seen FROM files WHERE path = ?", (filepath,)) + row = cursor.fetchone() + first_seen = row[0] if row else now + + conn.execute(""" + INSERT OR REPLACE INTO files + (path, filename, directory, original_size, mtime, video_codec, audio_codec, + is_hevc, first_seen, scanned_at, status) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, + COALESCE((SELECT status FROM files WHERE path = ?), 'pending')) + """, (filepath, p.name, str(p.parent), size, mtime, video_codec, audio_codec, + int(is_hevc), first_seen, now, filepath)) + conn.commit() + + +def update_transcode_result(conn: sqlite3.Connection, filepath: str, new_path: str, + new_size: int, duration_secs: float, settings: dict, + success: bool, failure_reason: str = None): + """Update database after transcode attempt.""" + now = datetime.now().timestamp() + + if success: + cursor = conn.execute("SELECT original_size FROM files WHERE path = ?", (filepath,)) + row = cursor.fetchone() + original_size = row[0] if row else 0 + + space_saved = original_size - new_size + compression_ratio = new_size / original_size if original_size > 0 else 0 + + conn.execute(""" + UPDATE files SET + status = 'done', + transcoded_at = ?, + transcoded_size = ?, + transcode_duration_secs = ?, + transcode_settings = ?, + space_saved = ?, + compression_ratio = ?, + failure_reason = NULL + WHERE path = ? + """, (now, new_size, duration_secs, json.dumps(settings), + space_saved, compression_ratio, filepath)) + + # Update running totals + conn.execute(""" + UPDATE stats SET + total_files_transcoded = total_files_transcoded + 1, + total_space_saved = total_space_saved + ?, + total_transcode_time_secs = total_transcode_time_secs + ?, + last_updated = ? + WHERE id = 1 + """, (space_saved, duration_secs, now)) + else: + conn.execute(""" + UPDATE files SET + status = 'failed', + failure_reason = ? + WHERE path = ? + """, (failure_reason, filepath)) + + conn.commit() + + +def remove_from_cache(conn: sqlite3.Connection, filepath: str): + """Remove a file from the cache.""" + conn.execute("DELETE FROM files WHERE path = ?", (filepath,)) + conn.commit() + + +def get_cache_stats(conn: sqlite3.Connection) -> dict: + """Get comprehensive stats from the cache.""" + # File counts and sizes + cursor = conn.execute(""" + SELECT + COUNT(*) as total, + SUM(CASE WHEN is_hevc = 1 THEN 1 ELSE 0 END) as hevc_count, + SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN 1 ELSE 0 END) as pending_count, + SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) as done_count, + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count, + SUM(CASE WHEN is_hevc = 1 THEN original_size ELSE 0 END) as hevc_size, + SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN original_size ELSE 0 END) as pending_size + FROM files + """) + row = cursor.fetchone() + file_stats = { + "total": row[0] or 0, + "hevc_count": row[1] or 0, + "pending_count": row[2] or 0, + "done_count": row[3] or 0, + "failed_count": row[4] or 0, + "hevc_size": row[5] or 0, + "pending_size": row[6] or 0, + } + + # Running totals + cursor = conn.execute("SELECT * FROM stats WHERE id = 1") + row = cursor.fetchone() + if row: + file_stats["total_transcoded"] = row[1] or 0 + file_stats["total_space_saved"] = row[2] or 0 + file_stats["total_transcode_time"] = row[3] or 0 + + # By directory breakdown + cursor = conn.execute(""" + SELECT directory, + COUNT(*) as count, + SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN original_size ELSE 0 END) as pending_size + FROM files + GROUP BY directory + ORDER BY pending_size DESC + LIMIT 10 + """) + file_stats["by_directory"] = cursor.fetchall() + + return file_stats + +# Ensure log directory exists (skip for /var/log which requires root) +if not CONFIG["log_dir"].startswith("/var/"): + Path(CONFIG["log_dir"]).mkdir(parents=True, exist_ok=True) + +# Set up logging +log_level = logging.DEBUG if VERBOSE else logging.INFO +logging.basicConfig( + level=log_level, + format='%(asctime)s [%(levelname)s] %(message)s', + handlers=[ + logging.FileHandler(CONFIG["log_file"]), + logging.StreamHandler(sys.stdout) + ] +) +log = logging.getLogger(__name__) + + +def log_json(event: str, **data): + """Write structured JSON log entry (for machine parsing).""" + if not JSON_LOG: + return + entry = { + "ts": datetime.now().isoformat(), + "event": event, + **data + } + print(json.dumps(entry), file=sys.stderr) + + +def format_bytes(size: int) -> str: + """Human-readable byte size.""" + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if abs(size) < 1024.0: + return f"{size:.2f} {unit}" + size /= 1024.0 + return f"{size:.2f} PB" + + +def format_duration(seconds: float) -> str: + """Human-readable duration.""" + if seconds < 60: + return f"{seconds:.1f}s" + elif seconds < 3600: + return f"{seconds/60:.1f}m" + else: + return f"{seconds/3600:.1f}h" + + +def get_disk_space(path: str) -> dict: + """Get disk space info for a path.""" + try: + stat = os.statvfs(path) + total = stat.f_blocks * stat.f_frsize + free = stat.f_bavail * stat.f_frsize + used = total - free + return { + "total": total, + "free": free, + "used": used, + "percent_used": (used / total * 100) if total > 0 else 0 + } + except Exception as e: + log.warning(f"Failed to get disk space for {path}: {e}") + return None + + +def check_disk_space(): + """Log disk space for all source directories.""" + log.info("=" * 50) + log.info("Disk Space Check") + log.info("=" * 50) + + checked_mounts = set() + for source_dir in CONFIG["source_dirs"]: + # Get mount point to avoid duplicate checks + try: + mount = subprocess.run( + ["df", "--output=target", source_dir], + capture_output=True, text=True + ).stdout.strip().split('\n')[-1] + except: + mount = source_dir + + if mount in checked_mounts: + continue + checked_mounts.add(mount) + + space = get_disk_space(source_dir) + if space: + log.info(f" {mount}:") + log.info(f" Total: {format_bytes(space['total'])}") + log.info(f" Free: {format_bytes(space['free'])} ({100-space['percent_used']:.1f}% available)") + log_json("disk_space", mount=mount, **space) + + if space['percent_used'] > 90: + log.warning(f" ⚠️ LOW DISK SPACE on {mount}!") + log.info("") + + +def get_video_duration(filepath: str) -> Optional[float]: + """Get video duration in seconds using ffprobe.""" + try: + result = subprocess.run([ + "ffprobe", "-v", "quiet", + "-show_entries", "format=duration", + "-of", "json", + filepath + ], capture_output=True, text=True, timeout=30) + + if result.returncode == 0: + data = json.loads(result.stdout) + return float(data.get("format", {}).get("duration", 0)) + except Exception as e: + log.debug(f"Failed to get duration for {filepath}: {e}") + return None + + +def handle_interrupt(signum, frame): + """Handle SIGINT/SIGTERM gracefully.""" + log.warning("=" * 50) + log.warning("INTERRUPT RECEIVED - cleaning up...") + log.warning("=" * 50) + + if CURRENT_TRANSCODE["path"]: + log.warning(f"Interrupted transcode: {CURRENT_TRANSCODE['path']}") + if CURRENT_TRANSCODE["start_time"]: + elapsed = (datetime.now() - CURRENT_TRANSCODE["start_time"]).total_seconds() + log.warning(f"Elapsed time: {format_duration(elapsed)}") + + # Try to kill ffmpeg if running + if CURRENT_TRANSCODE["pid"]: + try: + os.kill(CURRENT_TRANSCODE["pid"], signal.SIGTERM) + log.info("Sent SIGTERM to ffmpeg process") + except: + pass + + # Clean up partial output + temp_path = Path(CURRENT_TRANSCODE["path"]).with_suffix(".transcoding.mkv") + if temp_path.exists(): + log.info(f"Removing partial output: {temp_path}") + temp_path.unlink() + + log_json("interrupted", + file=CURRENT_TRANSCODE["path"], + elapsed=elapsed if CURRENT_TRANSCODE["start_time"] else None) + sys.exit(1) + + +# Register signal handlers +signal.signal(signal.SIGINT, handle_interrupt) +signal.signal(signal.SIGTERM, handle_interrupt) + + +def get_video_codec(filepath: str) -> Optional[str]: + """Get the video codec of a file using ffprobe.""" + try: + result = subprocess.run([ + "ffprobe", "-v", "quiet", + "-select_streams", "v:0", + "-show_entries", "stream=codec_name", + "-of", "json", + filepath + ], capture_output=True, text=True, timeout=30) + + if result.returncode == 0: + data = json.loads(result.stdout) + streams = data.get("streams", []) + if streams: + return streams[0].get("codec_name", "").lower() + except Exception as e: + log.warning(f"Failed to probe {filepath}: {e}") + return None + + +def is_hevc(filepath: str) -> bool: + """Check if a file is already HEVC encoded.""" + codec = get_video_codec(filepath) + return codec in ["hevc", "h265"] + + +def get_file_size_gb(filepath: str) -> float: + """Get file size in GB.""" + return os.path.getsize(filepath) / (1024 ** 3) + + +def find_videos_to_transcode(conn: sqlite3.Connection) -> List[Dict]: + """Find all non-HEVC videos, sorted by size (biggest first). Uses cache for speed.""" + videos = [] + cache_hits = 0 + cache_misses = 0 + files_scanned = 0 + dirs_scanned = 0 + + scan_start = datetime.now() + log.info("Scanning source directories...") + + for source_dir in CONFIG["source_dirs"]: + if not os.path.exists(source_dir): + log.warning(f" ⚠ Source directory not found: {source_dir}") + continue + + dir_start = datetime.now() + dir_files = 0 + dir_size = 0 + log.info(f" 📁 Scanning: {source_dir}") + + for root, dirs, files in os.walk(source_dir): + # Skip cleanup directory + if ".cleanup" in root: + continue + + dirs_scanned += 1 + + for filename in files: + ext = os.path.splitext(filename)[1].lower() + if ext not in CONFIG["video_extensions"]: + continue + + filepath = os.path.join(root, filename) + files_scanned += 1 + dir_files += 1 + + try: + stat = os.stat(filepath) + size = stat.st_size + mtime = stat.st_mtime + dir_size += size + + # Check cache first + cached = get_cached_file(conn, filepath, size, mtime) + if cached: + video_codec = cached["video_codec"] + audio_codec = cached["audio_codec"] + is_hevc = cached["is_hevc"] + status = cached["status"] + cache_hits += 1 + else: + # Need to probe this file + log.debug(f" Probing: {filename}") + video_codec = get_video_codec(filepath) + audio_codec = get_audio_codec(filepath) + is_hevc = video_codec in ["hevc", "h265"] if video_codec else False + status = "done" if is_hevc else "pending" + cache_file(conn, filepath, size, mtime, + video_codec or "unknown", audio_codec or "unknown", is_hevc) + cache_misses += 1 + + videos.append({ + "path": filepath, + "size": size, + "size_gb": size / (1024 ** 3), + "video_codec": video_codec, + "audio_codec": audio_codec, + "is_hevc": is_hevc, + "status": status + }) + except OSError as e: + log.warning(f" ⚠ Cannot access {filepath}: {e}") + + dir_elapsed = (datetime.now() - dir_start).total_seconds() + log.info(f" Found {dir_files} videos ({format_bytes(dir_size)}) in {format_duration(dir_elapsed)}") + log_json("scan_directory", + directory=source_dir, + files=dir_files, + size=dir_size, + elapsed=dir_elapsed) + + scan_elapsed = (datetime.now() - scan_start).total_seconds() + cache_rate = (cache_hits / (cache_hits + cache_misses) * 100) if (cache_hits + cache_misses) > 0 else 0 + + log.info("") + log.info(f"Scan complete in {format_duration(scan_elapsed)}") + log.info(f" Files scanned: {files_scanned}") + log.info(f" Dirs traversed: {dirs_scanned}") + log.info(f" Cache hits: {cache_hits} ({cache_rate:.1f}%)") + log.info(f" Cache misses: {cache_misses} (probed)") + log.info("") + + log_json("scan_complete", + files=files_scanned, + dirs=dirs_scanned, + cache_hits=cache_hits, + cache_misses=cache_misses, + elapsed=scan_elapsed) + + # Sort by size descending (biggest first) + videos.sort(key=lambda x: x["size"], reverse=True) + return videos + + +def is_past_cutoff() -> bool: + """Check if we're past the cutoff time for starting new encodes.""" + now = datetime.now().time() + return now >= CONFIG["cutoff_time"] + + +def verify_output(filepath: str, min_size_ratio: float = 0.1) -> bool: + """Verify the transcoded file is valid.""" + if not os.path.exists(filepath): + return False + + # Check file size is reasonable (at least 10% of some minimum) + size = os.path.getsize(filepath) + if size < 1024 * 1024: # Less than 1MB is definitely wrong + return False + + # Check it's valid video with ffprobe + codec = get_video_codec(filepath) + if codec != "hevc": + return False + + return True + + +def transcode_file(input_path: str, conn: sqlite3.Connection) -> Optional[dict]: + """Transcode a single file to HEVC. Returns result dict on success.""" + + input_file = Path(input_path) + output_path = input_file.with_suffix(".transcoding.mkv") + final_path = input_file.with_suffix(".mkv") + + original_size = os.path.getsize(input_path) + video_duration = get_video_duration(input_path) + + log.info("") + log.info("=" * 60) + log.info(f"TRANSCODE START: {input_file.name}") + log.info("=" * 60) + log.info(f" Input path: {input_path}") + log.info(f" Output path: {output_path}") + log.info(f" Original size: {format_bytes(original_size)}") + if video_duration: + log.info(f" Duration: {format_duration(video_duration)}") + + log_json("transcode_start", + file=input_path, + size=original_size, + duration=video_duration) + + # Mark as transcoding in DB + conn.execute("UPDATE files SET status = 'transcoding' WHERE path = ?", (input_path,)) + conn.commit() + + start_time = datetime.now() + CURRENT_TRANSCODE["path"] = input_path + CURRENT_TRANSCODE["start_time"] = start_time + + settings = { + "crf": CONFIG["crf"], + "preset": CONFIG["preset"], + "audio_codec": CONFIG["audio_codec"], + "audio_bitrate": CONFIG["audio_bitrate"], + "threads": CONFIG["threads"] + } + + # Build ffmpeg command + cmd = [ + "ffmpeg", "-y", + "-threads", str(CONFIG["threads"]), + "-progress", "pipe:1", # Progress to stdout for parsing + "-i", input_path, + "-c:v", "libx265", + "-preset", CONFIG["preset"], + "-crf", str(CONFIG["crf"]), + "-x265-params", f"pools={CONFIG['threads']}", + "-c:a", CONFIG["audio_codec"], + "-b:a", CONFIG["audio_bitrate"], + "-c:s", "copy", # Copy subtitles + str(output_path) + ] + + # Log the full command for debugging + cmd_str = ' '.join(f'"{c}"' if ' ' in c else c for c in cmd) + log.info(f" FFmpeg command:") + log.debug(f" {cmd_str}") + if not VERBOSE: + log.info(f" (use --verbose to see full command)") + log.info("") + log_json("ffmpeg_command", command=cmd_str) + + # Log stderr to file to prevent pipe buffer deadlock + stderr_log_path = Path(CONFIG["log_dir"]) / "transcoder_ffmpeg_stderr.log" + stderr_file = open(stderr_log_path, "a") + stderr_file.write(f"\n{'='*60}\n{datetime.now()} - {input_path}\n{'='*60}\n") + stderr_file.flush() + + try: + # Use Popen for progress monitoring + # stderr goes to file (not PIPE) to prevent buffer deadlock + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=stderr_file, + text=True + ) + CURRENT_TRANSCODE["pid"] = process.pid + log.debug(f" FFmpeg PID: {process.pid}") + + # Monitor progress + last_progress_log = datetime.now() + progress_interval = 60 # Log progress every 60 seconds + current_time_us = 0 + + import select + last_output_size = 0 + last_output_growth = datetime.now() + stall_timeout = 600 # 10 minutes with no output growth = stalled + + while True: + # Use select for non-blocking read with timeout + ready, _, _ = select.select([process.stdout], [], [], 30) + + if ready: + line = process.stdout.readline() + if not line and process.poll() is not None: + break + + # Parse progress output + if line.startswith("out_time_us="): + try: + current_time_us = int(line.split("=")[1]) + except: + pass + elif process.poll() is not None: + break + + # Log progress periodically + now = datetime.now() + if (now - last_progress_log).total_seconds() >= progress_interval: + elapsed = (now - start_time).total_seconds() + + if video_duration and current_time_us > 0: + current_secs = current_time_us / 1_000_000 + percent = (current_secs / video_duration) * 100 + eta_secs = (elapsed / percent * 100) - elapsed if percent > 0 else 0 + + log.info(f" ⏳ Progress: {percent:.1f}% | Elapsed: {format_duration(elapsed)} | ETA: {format_duration(eta_secs)}") + log_json("transcode_progress", + file=input_path, + percent=percent, + elapsed=elapsed, + eta=eta_secs) + else: + log.info(f" ⏳ Elapsed: {format_duration(elapsed)}") + + # Check output size growth and detect stalls + if output_path.exists(): + current_size = os.path.getsize(output_path) + log.info(f" Output size: {format_bytes(current_size)}") + + if current_size > last_output_size: + last_output_size = current_size + last_output_growth = now + elif (now - last_output_growth).total_seconds() > stall_timeout: + log.error(f" 🛑 STALL DETECTED: Output file hasn't grown in {stall_timeout}s") + log.error(f" Output stuck at {format_bytes(current_size)}") + log.error(f" Killing ffmpeg (PID {process.pid})") + process.kill() + process.wait(timeout=30) + + # Mark as stalled in DB with timestamp for 7-day cooldown + conn.execute( + "UPDATE files SET status = 'stalled', failure_reason = ?, transcoded_at = ? WHERE path = ?", + (f"Output stalled at {format_bytes(current_size)} after {format_duration(elapsed)}", + datetime.now().timestamp(), input_path) + ) + conn.commit() + + # Clean up partial output + if output_path.exists(): + output_path.unlink() + log.info(f" Cleaned up partial file: {output_path}") + + stderr_file.close() + return None + + last_progress_log = now + + # Wait for process to finish (stderr already going to file) + process.wait(timeout=60) + stderr_file.close() + + if process.returncode != 0: + # Read last 1000 chars of stderr log for error reporting + try: + with open(stderr_log_path, 'r') as f: + f.seek(max(0, os.path.getsize(stderr_log_path) - 1000)) + error_msg = f.read() + except: + error_msg = "Unknown error (check transcoder_ffmpeg_stderr.log)" + log.error("=" * 60) + log.error("TRANSCODE FAILED") + log.error("=" * 60) + log.error(f" File: {input_path}") + log.error(f" Exit code: {process.returncode}") + log.error(f" FFmpeg error output:") + for line in error_msg.split('\n')[-20:]: # Last 20 lines + if line.strip(): + log.error(f" {line}") + log.error("=" * 60) + + log_json("transcode_failed", + file=input_path, + exit_code=process.returncode, + error=error_msg[-500:]) + + if output_path.exists(): + output_path.unlink() + update_transcode_result(conn, input_path, None, 0, 0, settings, False, error_msg[-500:]) + CURRENT_TRANSCODE["path"] = None + return None + + except subprocess.TimeoutExpired: + log.error("=" * 60) + log.error(f"TRANSCODE TIMEOUT (8 hours): {input_path}") + log.error("=" * 60) + log_json("transcode_timeout", file=input_path) + process.kill() + if output_path.exists(): + output_path.unlink() + update_transcode_result(conn, input_path, None, 0, 0, settings, False, "Timeout (8 hours)") + CURRENT_TRANSCODE["path"] = None + return None + except Exception as e: + log.error("=" * 60) + log.error(f"TRANSCODE ERROR: {input_path}") + log.error("=" * 60) + log.error(f" Exception: {type(e).__name__}: {e}") + log.error(f" Traceback:") + for line in traceback.format_exc().split('\n'): + if line.strip(): + log.error(f" {line}") + log.error("=" * 60) + + log_json("transcode_error", + file=input_path, + error=str(e), + traceback=traceback.format_exc()) + + if output_path.exists(): + output_path.unlink() + update_transcode_result(conn, input_path, None, 0, 0, settings, False, str(e)) + CURRENT_TRANSCODE["path"] = None + return None + + # Verify output + log.info(" Verifying output file...") + if not verify_output(str(output_path)): + log.error("=" * 60) + log.error("OUTPUT VERIFICATION FAILED") + log.error("=" * 60) + log.error(f" File: {output_path}") + if output_path.exists(): + log.error(f" Output size: {format_bytes(os.path.getsize(output_path))}") + output_codec = get_video_codec(str(output_path)) + log.error(f" Output codec: {output_codec}") + else: + log.error(" Output file does not exist!") + log.error("=" * 60) + + log_json("transcode_verify_failed", file=input_path) + if output_path.exists(): + output_path.unlink() + update_transcode_result(conn, input_path, None, 0, 0, settings, False, "Output verification failed") + CURRENT_TRANSCODE["path"] = None + return None + + new_size = os.path.getsize(output_path) + duration_secs = (datetime.now() - start_time).total_seconds() + space_saved = original_size - new_size + compression_ratio = new_size / original_size if original_size > 0 else 0 + encode_speed = video_duration / duration_secs if video_duration and duration_secs > 0 else 0 + + log.info("") + log.info("=" * 60) + log.info("✅ TRANSCODE COMPLETE") + log.info("=" * 60) + log.info(f" File: {input_file.name}") + log.info(f" Duration: {format_duration(duration_secs)}") + log.info(f" Original: {format_bytes(original_size)}") + log.info(f" New size: {format_bytes(new_size)}") + log.info(f" Saved: {format_bytes(space_saved)} ({100*(1-compression_ratio):.1f}% reduction)") + log.info(f" Ratio: {compression_ratio:.2f}x") + if encode_speed > 0: + log.info(f" Speed: {encode_speed:.2f}x realtime") + log.info("=" * 60) + log.info("") + + log_json("transcode_complete", + file=input_path, + original_size=original_size, + new_size=new_size, + space_saved=space_saved, + duration_secs=duration_secs, + compression_ratio=compression_ratio, + encode_speed=encode_speed) + + # Move original to cleanup directory + cleanup_dir = Path(CONFIG["cleanup_dir"]) + cleanup_dir.mkdir(parents=True, exist_ok=True) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + cleanup_name = f"{timestamp}_{input_file.name}" + cleanup_path = cleanup_dir / cleanup_name + + log.info(f" Moving original to cleanup: {cleanup_path}") + try: + shutil.move(input_path, cleanup_path) + log.info(f" ✓ Original moved successfully") + log_json("original_moved", source=input_path, dest=str(cleanup_path)) + except Exception as e: + log.error(f" ✗ Failed to move original: {e}") + log.error(f" Traceback: {traceback.format_exc()}") + # Don't fail the whole operation, but keep both files + + # Rename output to final name + log.info(f" Renaming output: {output_path} -> {final_path}") + try: + output_path.rename(final_path) + log.info(f" ✓ Output renamed successfully") + except Exception as e: + log.error(f" ✗ Failed to rename output: {e}") + final_path = output_path + + # Fix permissions - mediaserver group + log.debug(" Setting permissions (mediaserver:664)") + try: + subprocess.run(["chgrp", "mediaserver", str(final_path)], check=False) + subprocess.run(["chmod", "664", str(final_path)], check=False) + log.debug(" ✓ Permissions set") + except Exception as e: + log.warning(f" ✗ Failed to set permissions: {e}") + + # Update database with success + update_transcode_result(conn, input_path, str(final_path), new_size, duration_secs, settings, True) + + CURRENT_TRANSCODE["path"] = None + return { + "output_path": str(final_path), + "new_size": new_size, + "duration_secs": duration_secs, + "space_saved": space_saved + } + + +def cleanup_old_files(): + """Delete original files older than cleanup_days.""" + cleanup_dir = Path(CONFIG["cleanup_dir"]) + if not cleanup_dir.exists(): + log.debug("Cleanup directory doesn't exist, skipping cleanup") + return + + log.info("Checking cleanup directory for old originals...") + cutoff = datetime.now().timestamp() - (CONFIG["cleanup_days"] * 24 * 3600) + cutoff_date = datetime.fromtimestamp(cutoff).strftime('%Y-%m-%d %H:%M') + log.debug(f" Cutoff date: {cutoff_date} ({CONFIG['cleanup_days']} days ago)") + + deleted_count = 0 + deleted_size = 0 + kept_count = 0 + kept_size = 0 + + for filepath in cleanup_dir.iterdir(): + if filepath.is_file(): + mtime = filepath.stat().st_mtime + size = filepath.stat().st_size + + if mtime < cutoff: + try: + filepath.unlink() + deleted_count += 1 + deleted_size += size + log.info(f" 🗑️ Deleted: {filepath.name} ({format_bytes(size)})") + log_json("cleanup_deleted", file=str(filepath), size=size) + except Exception as e: + log.warning(f" ⚠ Failed to delete {filepath}: {e}") + else: + kept_count += 1 + kept_size += size + days_old = (datetime.now().timestamp() - mtime) / 86400 + log.debug(f" ⏳ Keeping: {filepath.name} ({days_old:.1f} days old)") + + if deleted_count > 0: + log.info(f" Cleanup: deleted {deleted_count} files, freed {format_bytes(deleted_size)}") + log_json("cleanup_complete", deleted=deleted_count, freed=deleted_size) + else: + log.info(f" Cleanup: nothing to delete ({kept_count} files still in retention)") + + if kept_count > 0: + log.debug(f" Retention: {kept_count} files ({format_bytes(kept_size)}) waiting)") + + +def main(): + # Initialize database + conn = init_db() + + # Handle --stats flag + if args.stats: + stats = get_cache_stats(conn) + print("=" * 60) + print("Transcoder Statistics") + print("=" * 60) + print(f"Total files scanned: {stats['total']}") + print(f"Already HEVC: {stats['hevc_count']} ({stats['hevc_size']/(1024**3):.2f} GB)") + print(f"Pending transcode: {stats['pending_count']} ({stats['pending_size']/(1024**3):.2f} GB)") + print(f"Completed: {stats['done_count']}") + print(f"Failed: {stats['failed_count']}") + print("") + print("--- Lifetime Stats ---") + print(f"Total transcoded: {stats.get('total_transcoded', 0)} files") + print(f"Total space saved: {stats.get('total_space_saved', 0)/(1024**3):.2f} GB") + total_time = stats.get('total_transcode_time', 0) + print(f"Total transcode time: {total_time/3600:.1f} hours") + if stats['pending_count'] > 0: + est_savings = stats['pending_size'] * 0.5 + print(f"\nEstimated remaining: ~{est_savings/(1024**3):.2f} GB savings") + + if stats.get('by_directory'): + print("\n--- Top Directories (by pending size) ---") + for dir_path, count, pending in stats['by_directory'][:5]: + if pending > 0: + # Shorten path for display + short = dir_path.replace("/disks/Plex/", "") + print(f" {short}: {pending/(1024**3):.2f} GB pending") + + print("=" * 60) + conn.close() + return + + # Handle --clear-cache flag + if args.clear_cache: + conn.execute("DELETE FROM files") + conn.execute("UPDATE stats SET total_files_transcoded=0, total_space_saved=0, total_transcode_time_secs=0") + conn.commit() + print("Cache cleared.") + conn.close() + return + + # Handle --failed flag + if args.failed: + cursor = conn.execute(""" + SELECT path, original_size, failure_reason + FROM files WHERE status = 'failed' + ORDER BY original_size DESC + """) + rows = cursor.fetchall() + print("=" * 60) + print(f"Failed Transcodes ({len(rows)} files)") + print("=" * 60) + for path, size, reason in rows: + print(f"\n{path}") + print(f" Size: {size/(1024**3):.2f} GB") + print(f" Reason: {reason}") + if not rows: + print("No failed transcodes!") + print("=" * 60) + conn.close() + return + + # Handle --retry-failed flag + if args.retry_failed: + cursor = conn.execute("UPDATE files SET status = 'pending', failure_reason = NULL WHERE status = 'failed'") + count = cursor.rowcount + conn.commit() + print(f"Reset {count} failed files to pending.") + conn.close() + return + + session_start = datetime.now() + + log.info("") + log.info("=" * 70) + log.info(" OVERNIGHT VIDEO TRANSCODER") + log.info("=" * 70) + log.info(f" Started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}") + log.info(f" Mode: {'DRY RUN' if DRY_RUN else 'LIVE'}") + log.info(f" Verbose: {'YES' if VERBOSE else 'NO'}") + log.info(f" JSON log: {'YES' if JSON_LOG else 'NO'}") + log.info(f" Cutoff: {CONFIG['cutoff_time']}") + log.info(f" Settings: CRF={CONFIG['crf']}, preset={CONFIG['preset']}, audio={CONFIG['audio_codec']}@{CONFIG['audio_bitrate']}") + log.info("=" * 70) + log.info("") + + log_json("session_start", + mode="dry_run" if DRY_RUN else "live", + config=CONFIG) + + # Check disk space first + check_disk_space() + + # Run cleanup first (skip in dry run) + if not DRY_RUN: + cleanup_old_files() + + # Find videos to transcode + videos = find_videos_to_transcode(conn) + + # Filter to non-HEVC pending only (using cached codec info) + to_transcode = [] + hevc_count = 0 + hevc_size = 0 + + for video in videos: + if video.get("is_hevc", False) or video.get("status") == "done": + hevc_count += 1 + hevc_size += video["size"] + elif video.get("status") == "pending": + to_transcode.append(video) + + log.info(f"Already HEVC: {hevc_count} files ({hevc_size / (1024**3):.2f} GB)") + log.info(f"Need transcoding: {len(to_transcode)} files") + + if DRY_RUN: + log.info("") + log.info("=" * 60) + log.info("DRY RUN - Files that would be transcoded (biggest first):") + log.info("=" * 60) + + total_size = 0 + estimated_savings = 0 + + for i, video in enumerate(to_transcode[:20], 1): # Show top 20 + size_gb = video["size_gb"] + total_size += video["size"] + # Estimate ~50% compression for HEVC + est_saved = video["size"] * 0.5 + estimated_savings += est_saved + + log.info(f"{i:2}. [{size_gb:6.2f} GB] {video['path']}") + + if len(to_transcode) > 20: + log.info(f" ... and {len(to_transcode) - 20} more files") + + log.info("") + log.info("=" * 60) + log.info(f"Total to transcode: {total_size / (1024**3):.2f} GB") + log.info(f"Estimated savings (~50%): {estimated_savings / (1024**3):.2f} GB") + log.info("=" * 60) + return + + # Normal run + transcoded_count = 0 + total_saved = 0 + failed_count = 0 + skipped_count = 0 + + log.info("") + log.info("=" * 60) + log.info("Starting transcode queue") + log.info("=" * 60) + log.info(f" Files to process: {len(to_transcode)}") + log.info(f" Total size: {format_bytes(sum(v['size'] for v in to_transcode))}") + log.info("") + + for i, video in enumerate(to_transcode, 1): + # Check cutoff time + if is_past_cutoff(): + remaining = len(to_transcode) - i + 1 + log.info("") + log.info("=" * 60) + log.info(f"⏰ CUTOFF TIME REACHED ({CONFIG['cutoff_time']})") + log.info(f" Stopping with {remaining} files remaining in queue") + log.info("=" * 60) + log_json("cutoff_reached", remaining_files=remaining) + break + + # Skip already done or failed + if video.get("status") in ["done", "failed"]: + log.debug(f" Skipping (status={video['status']}): {video['path']}") + skipped_count += 1 + continue + + # Skip stalled files unless 7 days have passed + if video.get("status") == "stalled": + row = conn.execute( + "SELECT transcoded_at FROM files WHERE path = ?", (video['path'],) + ).fetchone() + if row and row[0]: + stalled_at = datetime.fromtimestamp(row[0]) + days_since = (datetime.now() - stalled_at).days + if days_since < 7: + log.debug(f" Skipping stalled file ({days_since}d ago, retry after 7d): {video['path']}") + skipped_count += 1 + continue + else: + log.info(f" Retrying stalled file ({days_since}d cooldown passed): {video['path']}") + else: + log.debug(f" Skipping stalled file (no timestamp): {video['path']}") + skipped_count += 1 + continue + + log.info(f"[{i}/{len(to_transcode)}] Queued: {Path(video['path']).name}") + log.info(f" Size: {format_bytes(video['size'])}") + log.info(f" Codec: video={video.get('video_codec', 'unknown')}, audio={video.get('audio_codec', 'unknown')}") + log.info(f" Path: {video['path']}") + + log_json("transcode_queued", + index=i, + total=len(to_transcode), + file=video['path'], + size=video['size'], + video_codec=video.get('video_codec'), + audio_codec=video.get('audio_codec')) + + # Transcode + result = transcode_file(video["path"], conn) + + if result: + total_saved += result["space_saved"] + transcoded_count += 1 + log.info(f" Running totals: {transcoded_count} done, {format_bytes(total_saved)} saved") + else: + failed_count += 1 + log.error(f" ❌ Transcode #{i} failed: {video['path']}") + log.info(f" Running totals: {transcoded_count} done, {failed_count} failed") + + session_end = datetime.now() + session_duration = (session_end - session_start).total_seconds() + + log.info("") + log.info("=" * 70) + log.info(" SESSION COMPLETE") + log.info("=" * 70) + log.info(f" Started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}") + log.info(f" Ended: {session_end.strftime('%Y-%m-%d %H:%M:%S')}") + log.info(f" Duration: {format_duration(session_duration)}") + log.info(f" Transcoded: {transcoded_count} files") + log.info(f" Failed: {failed_count} files") + log.info(f" Skipped: {skipped_count} files") + log.info(f" Space saved: {format_bytes(total_saved)}") + + # Get updated lifetime stats + stats = get_cache_stats(conn) + log.info("") + log.info(" --- Lifetime Totals ---") + log.info(f" Total transcoded: {stats.get('total_transcoded', 0)} files") + log.info(f" Total saved: {format_bytes(stats.get('total_space_saved', 0))}") + log.info(f" Pending: {stats.get('pending_count', 0)} files ({format_bytes(stats.get('pending_size', 0))})") + log.info("=" * 70) + log.info("") + + log_json("session_complete", + started=session_start.isoformat(), + ended=session_end.isoformat(), + duration_secs=session_duration, + transcoded=transcoded_count, + space_saved=total_saved, + lifetime_transcoded=stats.get('total_transcoded', 0), + lifetime_saved=stats.get('total_space_saved', 0), + pending_count=stats.get('pending_count', 0)) + + conn.close() + + +if __name__ == "__main__": + main() |
