#!/usr/bin/env python3 """ Overnight Video Transcoder Transcodes large video files to HEVC to save space. Runs via cron between 02:00-07:00. Usage: overnight_transcoder.py # Normal run overnight_transcoder.py --dry # Dry run - show what would be transcoded overnight_transcoder.py --stats # Show statistics overnight_transcoder.py --verbose # Extra debug output """ import os import sys import json import subprocess import shutil import logging import argparse import sqlite3 import traceback import re import signal from pathlib import Path from datetime import datetime, time from typing import Optional, List, Dict, Tuple from contextlib import contextmanager # Parse arguments parser = argparse.ArgumentParser(description="Overnight video transcoder") parser.add_argument("--dry", action="store_true", help="Dry run - don't transcode, just show what would happen") parser.add_argument("--stats", action="store_true", help="Show cache statistics and exit") parser.add_argument("--failed", action="store_true", help="Show failed transcodes and exit") parser.add_argument("--retry-failed", action="store_true", help="Reset failed files to pending for retry") parser.add_argument("--clear-cache", action="store_true", help="Clear the cache and exit") parser.add_argument("--verbose", "-v", action="store_true", help="Verbose debug output") parser.add_argument("--json-log", action="store_true", help="Output structured JSON logs (one per line)") args = parser.parse_args() DRY_RUN = args.dry VERBOSE = args.verbose JSON_LOG = args.json_log # Track current transcode for signal handling CURRENT_TRANSCODE = {"path": None, "pid": None, "start_time": None} # Configuration CONFIG = { "source_dirs": [ "/disks/Plex/Anime", "/disks/Plex/Films", "/disks/Plex/TV", ], "cleanup_dir": "/disks/Plex/.cleanup", "log_dir": "/var/log", "log_file": "/var/log/transcoder.log", "db_file": "/var/lib/transcoder/cache.db", "video_extensions": [".mkv", ".mp4", ".avi", ".m4v", ".mov", ".wmv"], "cutoff_time": time(6, 30), # Don't start new encodes after 06:30 "cleanup_days": 7, # Delete originals after 7 days # FFmpeg settings "crf": 20, "preset": "slow", "audio_codec": "ac3", "audio_bitrate": "640k", "threads": 12, } def init_db() -> sqlite3.Connection: """Initialize the SQLite database for caching file info.""" db_path = Path(CONFIG["db_file"]) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(CONFIG["db_file"]) # Main files table - tracks all video files conn.execute(""" CREATE TABLE IF NOT EXISTS files ( path TEXT PRIMARY KEY, filename TEXT, directory TEXT, original_size INTEGER, mtime REAL, video_codec TEXT, audio_codec TEXT, is_hevc INTEGER, first_seen REAL, scanned_at REAL, -- Transcode info (NULL if not transcoded) status TEXT DEFAULT 'pending', -- pending, transcoding, done, failed, stalled transcoded_at REAL, transcoded_size INTEGER, transcode_duration_secs REAL, transcode_settings TEXT, -- JSON: {crf, preset, audio_codec, audio_bitrate} failure_reason TEXT, -- Computed (updated after transcode) space_saved INTEGER, compression_ratio REAL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_is_hevc ON files(is_hevc)") conn.execute("CREATE INDEX IF NOT EXISTS idx_status ON files(status)") conn.execute("CREATE INDEX IF NOT EXISTS idx_size ON files(original_size)") conn.execute("CREATE INDEX IF NOT EXISTS idx_directory ON files(directory)") # Stats summary table - running totals conn.execute(""" CREATE TABLE IF NOT EXISTS stats ( id INTEGER PRIMARY KEY CHECK (id = 1), -- Single row total_files_transcoded INTEGER DEFAULT 0, total_space_saved INTEGER DEFAULT 0, total_transcode_time_secs REAL DEFAULT 0, last_updated REAL ) """) conn.execute("INSERT OR IGNORE INTO stats (id) VALUES (1)") conn.commit() return conn def get_audio_codec(filepath: str) -> Optional[str]: """Get the audio codec of a file using ffprobe.""" try: result = subprocess.run([ "ffprobe", "-v", "quiet", "-select_streams", "a:0", "-show_entries", "stream=codec_name", "-of", "json", filepath ], capture_output=True, text=True, timeout=30) if result.returncode == 0: data = json.loads(result.stdout) streams = data.get("streams", []) if streams: return streams[0].get("codec_name", "").lower() except Exception as e: log.warning(f"Failed to probe audio {filepath}: {e}") return None def get_cached_file(conn: sqlite3.Connection, filepath: str, size: int, mtime: float) -> Optional[dict]: """Get cached file info if file hasn't changed. Returns dict or None.""" cursor = conn.execute( """SELECT video_codec, audio_codec, is_hevc, status FROM files WHERE path = ? AND original_size = ? AND mtime = ?""", (filepath, size, mtime) ) row = cursor.fetchone() if row: return { "video_codec": row[0], "audio_codec": row[1], "is_hevc": bool(row[2]), "status": row[3] } return None def cache_file(conn: sqlite3.Connection, filepath: str, size: int, mtime: float, video_codec: str, audio_codec: str, is_hevc: bool): """Cache file info.""" p = Path(filepath) now = datetime.now().timestamp() # Check if file already exists (to preserve first_seen) cursor = conn.execute("SELECT first_seen FROM files WHERE path = ?", (filepath,)) row = cursor.fetchone() first_seen = row[0] if row else now conn.execute(""" INSERT OR REPLACE INTO files (path, filename, directory, original_size, mtime, video_codec, audio_codec, is_hevc, first_seen, scanned_at, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, COALESCE((SELECT status FROM files WHERE path = ?), 'pending')) """, (filepath, p.name, str(p.parent), size, mtime, video_codec, audio_codec, int(is_hevc), first_seen, now, filepath)) conn.commit() def update_transcode_result(conn: sqlite3.Connection, filepath: str, new_path: str, new_size: int, duration_secs: float, settings: dict, success: bool, failure_reason: str = None): """Update database after transcode attempt.""" now = datetime.now().timestamp() if success: cursor = conn.execute("SELECT original_size FROM files WHERE path = ?", (filepath,)) row = cursor.fetchone() original_size = row[0] if row else 0 space_saved = original_size - new_size compression_ratio = new_size / original_size if original_size > 0 else 0 conn.execute(""" UPDATE files SET status = 'done', transcoded_at = ?, transcoded_size = ?, transcode_duration_secs = ?, transcode_settings = ?, space_saved = ?, compression_ratio = ?, failure_reason = NULL WHERE path = ? """, (now, new_size, duration_secs, json.dumps(settings), space_saved, compression_ratio, filepath)) # Update running totals conn.execute(""" UPDATE stats SET total_files_transcoded = total_files_transcoded + 1, total_space_saved = total_space_saved + ?, total_transcode_time_secs = total_transcode_time_secs + ?, last_updated = ? WHERE id = 1 """, (space_saved, duration_secs, now)) else: conn.execute(""" UPDATE files SET status = 'failed', failure_reason = ? WHERE path = ? """, (failure_reason, filepath)) conn.commit() def remove_from_cache(conn: sqlite3.Connection, filepath: str): """Remove a file from the cache.""" conn.execute("DELETE FROM files WHERE path = ?", (filepath,)) conn.commit() def get_cache_stats(conn: sqlite3.Connection) -> dict: """Get comprehensive stats from the cache.""" # File counts and sizes cursor = conn.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN is_hevc = 1 THEN 1 ELSE 0 END) as hevc_count, SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN 1 ELSE 0 END) as pending_count, SUM(CASE WHEN status = 'done' THEN 1 ELSE 0 END) as done_count, SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed_count, SUM(CASE WHEN is_hevc = 1 THEN original_size ELSE 0 END) as hevc_size, SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN original_size ELSE 0 END) as pending_size FROM files """) row = cursor.fetchone() file_stats = { "total": row[0] or 0, "hevc_count": row[1] or 0, "pending_count": row[2] or 0, "done_count": row[3] or 0, "failed_count": row[4] or 0, "hevc_size": row[5] or 0, "pending_size": row[6] or 0, } # Running totals cursor = conn.execute("SELECT * FROM stats WHERE id = 1") row = cursor.fetchone() if row: file_stats["total_transcoded"] = row[1] or 0 file_stats["total_space_saved"] = row[2] or 0 file_stats["total_transcode_time"] = row[3] or 0 # By directory breakdown cursor = conn.execute(""" SELECT directory, COUNT(*) as count, SUM(CASE WHEN is_hevc = 0 AND status = 'pending' THEN original_size ELSE 0 END) as pending_size FROM files GROUP BY directory ORDER BY pending_size DESC LIMIT 10 """) file_stats["by_directory"] = cursor.fetchall() return file_stats # Ensure log directory exists (skip for /var/log which requires root) if not CONFIG["log_dir"].startswith("/var/"): Path(CONFIG["log_dir"]).mkdir(parents=True, exist_ok=True) # Set up logging log_level = logging.DEBUG if VERBOSE else logging.INFO logging.basicConfig( level=log_level, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[ logging.FileHandler(CONFIG["log_file"]), logging.StreamHandler(sys.stdout) ] ) log = logging.getLogger(__name__) def log_json(event: str, **data): """Write structured JSON log entry (for machine parsing).""" if not JSON_LOG: return entry = { "ts": datetime.now().isoformat(), "event": event, **data } print(json.dumps(entry), file=sys.stderr) def format_bytes(size: int) -> str: """Human-readable byte size.""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if abs(size) < 1024.0: return f"{size:.2f} {unit}" size /= 1024.0 return f"{size:.2f} PB" def format_duration(seconds: float) -> str: """Human-readable duration.""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: return f"{seconds/60:.1f}m" else: return f"{seconds/3600:.1f}h" def get_disk_space(path: str) -> dict: """Get disk space info for a path.""" try: stat = os.statvfs(path) total = stat.f_blocks * stat.f_frsize free = stat.f_bavail * stat.f_frsize used = total - free return { "total": total, "free": free, "used": used, "percent_used": (used / total * 100) if total > 0 else 0 } except Exception as e: log.warning(f"Failed to get disk space for {path}: {e}") return None def check_disk_space(): """Log disk space for all source directories.""" log.info("=" * 50) log.info("Disk Space Check") log.info("=" * 50) checked_mounts = set() for source_dir in CONFIG["source_dirs"]: # Get mount point to avoid duplicate checks try: mount = subprocess.run( ["df", "--output=target", source_dir], capture_output=True, text=True ).stdout.strip().split('\n')[-1] except: mount = source_dir if mount in checked_mounts: continue checked_mounts.add(mount) space = get_disk_space(source_dir) if space: log.info(f" {mount}:") log.info(f" Total: {format_bytes(space['total'])}") log.info(f" Free: {format_bytes(space['free'])} ({100-space['percent_used']:.1f}% available)") log_json("disk_space", mount=mount, **space) if space['percent_used'] > 90: log.warning(f" ⚠️ LOW DISK SPACE on {mount}!") log.info("") def get_video_duration(filepath: str) -> Optional[float]: """Get video duration in seconds using ffprobe.""" try: result = subprocess.run([ "ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "json", filepath ], capture_output=True, text=True, timeout=30) if result.returncode == 0: data = json.loads(result.stdout) return float(data.get("format", {}).get("duration", 0)) except Exception as e: log.debug(f"Failed to get duration for {filepath}: {e}") return None def handle_interrupt(signum, frame): """Handle SIGINT/SIGTERM gracefully.""" log.warning("=" * 50) log.warning("INTERRUPT RECEIVED - cleaning up...") log.warning("=" * 50) if CURRENT_TRANSCODE["path"]: log.warning(f"Interrupted transcode: {CURRENT_TRANSCODE['path']}") if CURRENT_TRANSCODE["start_time"]: elapsed = (datetime.now() - CURRENT_TRANSCODE["start_time"]).total_seconds() log.warning(f"Elapsed time: {format_duration(elapsed)}") # Try to kill ffmpeg if running if CURRENT_TRANSCODE["pid"]: try: os.kill(CURRENT_TRANSCODE["pid"], signal.SIGTERM) log.info("Sent SIGTERM to ffmpeg process") except: pass # Clean up partial output temp_path = Path(CURRENT_TRANSCODE["path"]).with_suffix(".transcoding.mkv") if temp_path.exists(): log.info(f"Removing partial output: {temp_path}") temp_path.unlink() log_json("interrupted", file=CURRENT_TRANSCODE["path"], elapsed=elapsed if CURRENT_TRANSCODE["start_time"] else None) sys.exit(1) # Register signal handlers signal.signal(signal.SIGINT, handle_interrupt) signal.signal(signal.SIGTERM, handle_interrupt) def get_video_codec(filepath: str) -> Optional[str]: """Get the video codec of a file using ffprobe.""" try: result = subprocess.run([ "ffprobe", "-v", "quiet", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "json", filepath ], capture_output=True, text=True, timeout=30) if result.returncode == 0: data = json.loads(result.stdout) streams = data.get("streams", []) if streams: return streams[0].get("codec_name", "").lower() except Exception as e: log.warning(f"Failed to probe {filepath}: {e}") return None def is_hevc(filepath: str) -> bool: """Check if a file is already HEVC encoded.""" codec = get_video_codec(filepath) return codec in ["hevc", "h265"] def get_file_size_gb(filepath: str) -> float: """Get file size in GB.""" return os.path.getsize(filepath) / (1024 ** 3) def find_videos_to_transcode(conn: sqlite3.Connection) -> List[Dict]: """Find all non-HEVC videos, sorted by size (biggest first). Uses cache for speed.""" videos = [] cache_hits = 0 cache_misses = 0 files_scanned = 0 dirs_scanned = 0 scan_start = datetime.now() log.info("Scanning source directories...") for source_dir in CONFIG["source_dirs"]: if not os.path.exists(source_dir): log.warning(f" ⚠ Source directory not found: {source_dir}") continue dir_start = datetime.now() dir_files = 0 dir_size = 0 log.info(f" 📁 Scanning: {source_dir}") for root, dirs, files in os.walk(source_dir): # Skip cleanup directory if ".cleanup" in root: continue dirs_scanned += 1 for filename in files: ext = os.path.splitext(filename)[1].lower() if ext not in CONFIG["video_extensions"]: continue filepath = os.path.join(root, filename) files_scanned += 1 dir_files += 1 try: stat = os.stat(filepath) size = stat.st_size mtime = stat.st_mtime dir_size += size # Check cache first cached = get_cached_file(conn, filepath, size, mtime) if cached: video_codec = cached["video_codec"] audio_codec = cached["audio_codec"] is_hevc = cached["is_hevc"] status = cached["status"] cache_hits += 1 else: # Need to probe this file log.debug(f" Probing: {filename}") video_codec = get_video_codec(filepath) audio_codec = get_audio_codec(filepath) is_hevc = video_codec in ["hevc", "h265"] if video_codec else False status = "done" if is_hevc else "pending" cache_file(conn, filepath, size, mtime, video_codec or "unknown", audio_codec or "unknown", is_hevc) cache_misses += 1 videos.append({ "path": filepath, "size": size, "size_gb": size / (1024 ** 3), "video_codec": video_codec, "audio_codec": audio_codec, "is_hevc": is_hevc, "status": status }) except OSError as e: log.warning(f" ⚠ Cannot access {filepath}: {e}") dir_elapsed = (datetime.now() - dir_start).total_seconds() log.info(f" Found {dir_files} videos ({format_bytes(dir_size)}) in {format_duration(dir_elapsed)}") log_json("scan_directory", directory=source_dir, files=dir_files, size=dir_size, elapsed=dir_elapsed) scan_elapsed = (datetime.now() - scan_start).total_seconds() cache_rate = (cache_hits / (cache_hits + cache_misses) * 100) if (cache_hits + cache_misses) > 0 else 0 log.info("") log.info(f"Scan complete in {format_duration(scan_elapsed)}") log.info(f" Files scanned: {files_scanned}") log.info(f" Dirs traversed: {dirs_scanned}") log.info(f" Cache hits: {cache_hits} ({cache_rate:.1f}%)") log.info(f" Cache misses: {cache_misses} (probed)") log.info("") log_json("scan_complete", files=files_scanned, dirs=dirs_scanned, cache_hits=cache_hits, cache_misses=cache_misses, elapsed=scan_elapsed) # Sort by size descending (biggest first) videos.sort(key=lambda x: x["size"], reverse=True) return videos def is_past_cutoff() -> bool: """Check if we're past the cutoff time for starting new encodes.""" now = datetime.now().time() return now >= CONFIG["cutoff_time"] def verify_output(filepath: str, min_size_ratio: float = 0.1) -> bool: """Verify the transcoded file is valid.""" if not os.path.exists(filepath): return False # Check file size is reasonable (at least 10% of some minimum) size = os.path.getsize(filepath) if size < 1024 * 1024: # Less than 1MB is definitely wrong return False # Check it's valid video with ffprobe codec = get_video_codec(filepath) if codec != "hevc": return False return True def transcode_file(input_path: str, conn: sqlite3.Connection) -> Optional[dict]: """Transcode a single file to HEVC. Returns result dict on success.""" input_file = Path(input_path) output_path = input_file.with_suffix(".transcoding.mkv") final_path = input_file.with_suffix(".mkv") original_size = os.path.getsize(input_path) video_duration = get_video_duration(input_path) log.info("") log.info("=" * 60) log.info(f"TRANSCODE START: {input_file.name}") log.info("=" * 60) log.info(f" Input path: {input_path}") log.info(f" Output path: {output_path}") log.info(f" Original size: {format_bytes(original_size)}") if video_duration: log.info(f" Duration: {format_duration(video_duration)}") log_json("transcode_start", file=input_path, size=original_size, duration=video_duration) # Mark as transcoding in DB conn.execute("UPDATE files SET status = 'transcoding' WHERE path = ?", (input_path,)) conn.commit() start_time = datetime.now() CURRENT_TRANSCODE["path"] = input_path CURRENT_TRANSCODE["start_time"] = start_time settings = { "crf": CONFIG["crf"], "preset": CONFIG["preset"], "audio_codec": CONFIG["audio_codec"], "audio_bitrate": CONFIG["audio_bitrate"], "threads": CONFIG["threads"] } # Build ffmpeg command cmd = [ "ffmpeg", "-y", "-threads", str(CONFIG["threads"]), "-progress", "pipe:1", # Progress to stdout for parsing "-i", input_path, "-c:v", "libx265", "-preset", CONFIG["preset"], "-crf", str(CONFIG["crf"]), "-x265-params", f"pools={CONFIG['threads']}", "-c:a", CONFIG["audio_codec"], "-b:a", CONFIG["audio_bitrate"], "-c:s", "copy", # Copy subtitles str(output_path) ] # Log the full command for debugging cmd_str = ' '.join(f'"{c}"' if ' ' in c else c for c in cmd) log.info(f" FFmpeg command:") log.debug(f" {cmd_str}") if not VERBOSE: log.info(f" (use --verbose to see full command)") log.info("") log_json("ffmpeg_command", command=cmd_str) # Log stderr to file to prevent pipe buffer deadlock stderr_log_path = Path(CONFIG["log_dir"]) / "transcoder_ffmpeg_stderr.log" stderr_file = open(stderr_log_path, "a") stderr_file.write(f"\n{'='*60}\n{datetime.now()} - {input_path}\n{'='*60}\n") stderr_file.flush() try: # Use Popen for progress monitoring # stderr goes to file (not PIPE) to prevent buffer deadlock process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=stderr_file, text=True ) CURRENT_TRANSCODE["pid"] = process.pid log.debug(f" FFmpeg PID: {process.pid}") # Monitor progress last_progress_log = datetime.now() progress_interval = 60 # Log progress every 60 seconds current_time_us = 0 import select last_output_size = 0 last_output_growth = datetime.now() stall_timeout = 600 # 10 minutes with no output growth = stalled while True: # Use select for non-blocking read with timeout ready, _, _ = select.select([process.stdout], [], [], 30) if ready: line = process.stdout.readline() if not line and process.poll() is not None: break # Parse progress output if line.startswith("out_time_us="): try: current_time_us = int(line.split("=")[1]) except: pass elif process.poll() is not None: break # Log progress periodically now = datetime.now() if (now - last_progress_log).total_seconds() >= progress_interval: elapsed = (now - start_time).total_seconds() if video_duration and current_time_us > 0: current_secs = current_time_us / 1_000_000 percent = (current_secs / video_duration) * 100 eta_secs = (elapsed / percent * 100) - elapsed if percent > 0 else 0 log.info(f" ⏳ Progress: {percent:.1f}% | Elapsed: {format_duration(elapsed)} | ETA: {format_duration(eta_secs)}") log_json("transcode_progress", file=input_path, percent=percent, elapsed=elapsed, eta=eta_secs) else: log.info(f" ⏳ Elapsed: {format_duration(elapsed)}") # Check output size growth and detect stalls if output_path.exists(): current_size = os.path.getsize(output_path) log.info(f" Output size: {format_bytes(current_size)}") if current_size > last_output_size: last_output_size = current_size last_output_growth = now elif (now - last_output_growth).total_seconds() > stall_timeout: log.error(f" 🛑 STALL DETECTED: Output file hasn't grown in {stall_timeout}s") log.error(f" Output stuck at {format_bytes(current_size)}") log.error(f" Killing ffmpeg (PID {process.pid})") process.kill() process.wait(timeout=30) # Mark as stalled in DB with timestamp for 7-day cooldown conn.execute( "UPDATE files SET status = 'stalled', failure_reason = ?, transcoded_at = ? WHERE path = ?", (f"Output stalled at {format_bytes(current_size)} after {format_duration(elapsed)}", datetime.now().timestamp(), input_path) ) conn.commit() # Clean up partial output if output_path.exists(): output_path.unlink() log.info(f" Cleaned up partial file: {output_path}") stderr_file.close() return None last_progress_log = now # Wait for process to finish (stderr already going to file) process.wait(timeout=60) stderr_file.close() if process.returncode != 0: # Read last 1000 chars of stderr log for error reporting try: with open(stderr_log_path, 'r') as f: f.seek(max(0, os.path.getsize(stderr_log_path) - 1000)) error_msg = f.read() except: error_msg = "Unknown error (check transcoder_ffmpeg_stderr.log)" log.error("=" * 60) log.error("TRANSCODE FAILED") log.error("=" * 60) log.error(f" File: {input_path}") log.error(f" Exit code: {process.returncode}") log.error(f" FFmpeg error output:") for line in error_msg.split('\n')[-20:]: # Last 20 lines if line.strip(): log.error(f" {line}") log.error("=" * 60) log_json("transcode_failed", file=input_path, exit_code=process.returncode, error=error_msg[-500:]) if output_path.exists(): output_path.unlink() update_transcode_result(conn, input_path, None, 0, 0, settings, False, error_msg[-500:]) CURRENT_TRANSCODE["path"] = None return None except subprocess.TimeoutExpired: log.error("=" * 60) log.error(f"TRANSCODE TIMEOUT (8 hours): {input_path}") log.error("=" * 60) log_json("transcode_timeout", file=input_path) process.kill() if output_path.exists(): output_path.unlink() update_transcode_result(conn, input_path, None, 0, 0, settings, False, "Timeout (8 hours)") CURRENT_TRANSCODE["path"] = None return None except Exception as e: log.error("=" * 60) log.error(f"TRANSCODE ERROR: {input_path}") log.error("=" * 60) log.error(f" Exception: {type(e).__name__}: {e}") log.error(f" Traceback:") for line in traceback.format_exc().split('\n'): if line.strip(): log.error(f" {line}") log.error("=" * 60) log_json("transcode_error", file=input_path, error=str(e), traceback=traceback.format_exc()) if output_path.exists(): output_path.unlink() update_transcode_result(conn, input_path, None, 0, 0, settings, False, str(e)) CURRENT_TRANSCODE["path"] = None return None # Verify output log.info(" Verifying output file...") if not verify_output(str(output_path)): log.error("=" * 60) log.error("OUTPUT VERIFICATION FAILED") log.error("=" * 60) log.error(f" File: {output_path}") if output_path.exists(): log.error(f" Output size: {format_bytes(os.path.getsize(output_path))}") output_codec = get_video_codec(str(output_path)) log.error(f" Output codec: {output_codec}") else: log.error(" Output file does not exist!") log.error("=" * 60) log_json("transcode_verify_failed", file=input_path) if output_path.exists(): output_path.unlink() update_transcode_result(conn, input_path, None, 0, 0, settings, False, "Output verification failed") CURRENT_TRANSCODE["path"] = None return None new_size = os.path.getsize(output_path) duration_secs = (datetime.now() - start_time).total_seconds() space_saved = original_size - new_size compression_ratio = new_size / original_size if original_size > 0 else 0 encode_speed = video_duration / duration_secs if video_duration and duration_secs > 0 else 0 log.info("") log.info("=" * 60) log.info("✅ TRANSCODE COMPLETE") log.info("=" * 60) log.info(f" File: {input_file.name}") log.info(f" Duration: {format_duration(duration_secs)}") log.info(f" Original: {format_bytes(original_size)}") log.info(f" New size: {format_bytes(new_size)}") log.info(f" Saved: {format_bytes(space_saved)} ({100*(1-compression_ratio):.1f}% reduction)") log.info(f" Ratio: {compression_ratio:.2f}x") if encode_speed > 0: log.info(f" Speed: {encode_speed:.2f}x realtime") log.info("=" * 60) log.info("") log_json("transcode_complete", file=input_path, original_size=original_size, new_size=new_size, space_saved=space_saved, duration_secs=duration_secs, compression_ratio=compression_ratio, encode_speed=encode_speed) # Move original to cleanup directory cleanup_dir = Path(CONFIG["cleanup_dir"]) cleanup_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") cleanup_name = f"{timestamp}_{input_file.name}" cleanup_path = cleanup_dir / cleanup_name log.info(f" Moving original to cleanup: {cleanup_path}") try: shutil.move(input_path, cleanup_path) log.info(f" ✓ Original moved successfully") log_json("original_moved", source=input_path, dest=str(cleanup_path)) except Exception as e: log.error(f" ✗ Failed to move original: {e}") log.error(f" Traceback: {traceback.format_exc()}") # Don't fail the whole operation, but keep both files # Rename output to final name log.info(f" Renaming output: {output_path} -> {final_path}") try: output_path.rename(final_path) log.info(f" ✓ Output renamed successfully") except Exception as e: log.error(f" ✗ Failed to rename output: {e}") final_path = output_path # Fix permissions - mediaserver group log.debug(" Setting permissions (mediaserver:664)") try: subprocess.run(["chgrp", "mediaserver", str(final_path)], check=False) subprocess.run(["chmod", "664", str(final_path)], check=False) log.debug(" ✓ Permissions set") except Exception as e: log.warning(f" ✗ Failed to set permissions: {e}") # Update database with success update_transcode_result(conn, input_path, str(final_path), new_size, duration_secs, settings, True) CURRENT_TRANSCODE["path"] = None return { "output_path": str(final_path), "new_size": new_size, "duration_secs": duration_secs, "space_saved": space_saved } def cleanup_old_files(): """Delete original files older than cleanup_days.""" cleanup_dir = Path(CONFIG["cleanup_dir"]) if not cleanup_dir.exists(): log.debug("Cleanup directory doesn't exist, skipping cleanup") return log.info("Checking cleanup directory for old originals...") cutoff = datetime.now().timestamp() - (CONFIG["cleanup_days"] * 24 * 3600) cutoff_date = datetime.fromtimestamp(cutoff).strftime('%Y-%m-%d %H:%M') log.debug(f" Cutoff date: {cutoff_date} ({CONFIG['cleanup_days']} days ago)") deleted_count = 0 deleted_size = 0 kept_count = 0 kept_size = 0 for filepath in cleanup_dir.iterdir(): if filepath.is_file(): mtime = filepath.stat().st_mtime size = filepath.stat().st_size if mtime < cutoff: try: filepath.unlink() deleted_count += 1 deleted_size += size log.info(f" 🗑️ Deleted: {filepath.name} ({format_bytes(size)})") log_json("cleanup_deleted", file=str(filepath), size=size) except Exception as e: log.warning(f" ⚠ Failed to delete {filepath}: {e}") else: kept_count += 1 kept_size += size days_old = (datetime.now().timestamp() - mtime) / 86400 log.debug(f" ⏳ Keeping: {filepath.name} ({days_old:.1f} days old)") if deleted_count > 0: log.info(f" Cleanup: deleted {deleted_count} files, freed {format_bytes(deleted_size)}") log_json("cleanup_complete", deleted=deleted_count, freed=deleted_size) else: log.info(f" Cleanup: nothing to delete ({kept_count} files still in retention)") if kept_count > 0: log.debug(f" Retention: {kept_count} files ({format_bytes(kept_size)}) waiting)") def main(): # Initialize database conn = init_db() # Handle --stats flag if args.stats: stats = get_cache_stats(conn) print("=" * 60) print("Transcoder Statistics") print("=" * 60) print(f"Total files scanned: {stats['total']}") print(f"Already HEVC: {stats['hevc_count']} ({stats['hevc_size']/(1024**3):.2f} GB)") print(f"Pending transcode: {stats['pending_count']} ({stats['pending_size']/(1024**3):.2f} GB)") print(f"Completed: {stats['done_count']}") print(f"Failed: {stats['failed_count']}") print("") print("--- Lifetime Stats ---") print(f"Total transcoded: {stats.get('total_transcoded', 0)} files") print(f"Total space saved: {stats.get('total_space_saved', 0)/(1024**3):.2f} GB") total_time = stats.get('total_transcode_time', 0) print(f"Total transcode time: {total_time/3600:.1f} hours") if stats['pending_count'] > 0: est_savings = stats['pending_size'] * 0.5 print(f"\nEstimated remaining: ~{est_savings/(1024**3):.2f} GB savings") if stats.get('by_directory'): print("\n--- Top Directories (by pending size) ---") for dir_path, count, pending in stats['by_directory'][:5]: if pending > 0: # Shorten path for display short = dir_path.replace("/disks/Plex/", "") print(f" {short}: {pending/(1024**3):.2f} GB pending") print("=" * 60) conn.close() return # Handle --clear-cache flag if args.clear_cache: conn.execute("DELETE FROM files") conn.execute("UPDATE stats SET total_files_transcoded=0, total_space_saved=0, total_transcode_time_secs=0") conn.commit() print("Cache cleared.") conn.close() return # Handle --failed flag if args.failed: cursor = conn.execute(""" SELECT path, original_size, failure_reason FROM files WHERE status = 'failed' ORDER BY original_size DESC """) rows = cursor.fetchall() print("=" * 60) print(f"Failed Transcodes ({len(rows)} files)") print("=" * 60) for path, size, reason in rows: print(f"\n{path}") print(f" Size: {size/(1024**3):.2f} GB") print(f" Reason: {reason}") if not rows: print("No failed transcodes!") print("=" * 60) conn.close() return # Handle --retry-failed flag if args.retry_failed: cursor = conn.execute("UPDATE files SET status = 'pending', failure_reason = NULL WHERE status = 'failed'") count = cursor.rowcount conn.commit() print(f"Reset {count} failed files to pending.") conn.close() return session_start = datetime.now() log.info("") log.info("=" * 70) log.info(" OVERNIGHT VIDEO TRANSCODER") log.info("=" * 70) log.info(f" Started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}") log.info(f" Mode: {'DRY RUN' if DRY_RUN else 'LIVE'}") log.info(f" Verbose: {'YES' if VERBOSE else 'NO'}") log.info(f" JSON log: {'YES' if JSON_LOG else 'NO'}") log.info(f" Cutoff: {CONFIG['cutoff_time']}") log.info(f" Settings: CRF={CONFIG['crf']}, preset={CONFIG['preset']}, audio={CONFIG['audio_codec']}@{CONFIG['audio_bitrate']}") log.info("=" * 70) log.info("") log_json("session_start", mode="dry_run" if DRY_RUN else "live", config=CONFIG) # Check disk space first check_disk_space() # Run cleanup first (skip in dry run) if not DRY_RUN: cleanup_old_files() # Find videos to transcode videos = find_videos_to_transcode(conn) # Filter to non-HEVC pending only (using cached codec info) to_transcode = [] hevc_count = 0 hevc_size = 0 for video in videos: if video.get("is_hevc", False) or video.get("status") == "done": hevc_count += 1 hevc_size += video["size"] elif video.get("status") == "pending": to_transcode.append(video) log.info(f"Already HEVC: {hevc_count} files ({hevc_size / (1024**3):.2f} GB)") log.info(f"Need transcoding: {len(to_transcode)} files") if DRY_RUN: log.info("") log.info("=" * 60) log.info("DRY RUN - Files that would be transcoded (biggest first):") log.info("=" * 60) total_size = 0 estimated_savings = 0 for i, video in enumerate(to_transcode[:20], 1): # Show top 20 size_gb = video["size_gb"] total_size += video["size"] # Estimate ~50% compression for HEVC est_saved = video["size"] * 0.5 estimated_savings += est_saved log.info(f"{i:2}. [{size_gb:6.2f} GB] {video['path']}") if len(to_transcode) > 20: log.info(f" ... and {len(to_transcode) - 20} more files") log.info("") log.info("=" * 60) log.info(f"Total to transcode: {total_size / (1024**3):.2f} GB") log.info(f"Estimated savings (~50%): {estimated_savings / (1024**3):.2f} GB") log.info("=" * 60) return # Normal run transcoded_count = 0 total_saved = 0 failed_count = 0 skipped_count = 0 log.info("") log.info("=" * 60) log.info("Starting transcode queue") log.info("=" * 60) log.info(f" Files to process: {len(to_transcode)}") log.info(f" Total size: {format_bytes(sum(v['size'] for v in to_transcode))}") log.info("") for i, video in enumerate(to_transcode, 1): # Check cutoff time if is_past_cutoff(): remaining = len(to_transcode) - i + 1 log.info("") log.info("=" * 60) log.info(f"⏰ CUTOFF TIME REACHED ({CONFIG['cutoff_time']})") log.info(f" Stopping with {remaining} files remaining in queue") log.info("=" * 60) log_json("cutoff_reached", remaining_files=remaining) break # Skip already done or failed if video.get("status") in ["done", "failed"]: log.debug(f" Skipping (status={video['status']}): {video['path']}") skipped_count += 1 continue # Skip stalled files unless 7 days have passed if video.get("status") == "stalled": row = conn.execute( "SELECT transcoded_at FROM files WHERE path = ?", (video['path'],) ).fetchone() if row and row[0]: stalled_at = datetime.fromtimestamp(row[0]) days_since = (datetime.now() - stalled_at).days if days_since < 7: log.debug(f" Skipping stalled file ({days_since}d ago, retry after 7d): {video['path']}") skipped_count += 1 continue else: log.info(f" Retrying stalled file ({days_since}d cooldown passed): {video['path']}") else: log.debug(f" Skipping stalled file (no timestamp): {video['path']}") skipped_count += 1 continue log.info(f"[{i}/{len(to_transcode)}] Queued: {Path(video['path']).name}") log.info(f" Size: {format_bytes(video['size'])}") log.info(f" Codec: video={video.get('video_codec', 'unknown')}, audio={video.get('audio_codec', 'unknown')}") log.info(f" Path: {video['path']}") log_json("transcode_queued", index=i, total=len(to_transcode), file=video['path'], size=video['size'], video_codec=video.get('video_codec'), audio_codec=video.get('audio_codec')) # Transcode result = transcode_file(video["path"], conn) if result: total_saved += result["space_saved"] transcoded_count += 1 log.info(f" Running totals: {transcoded_count} done, {format_bytes(total_saved)} saved") else: failed_count += 1 log.error(f" ❌ Transcode #{i} failed: {video['path']}") log.info(f" Running totals: {transcoded_count} done, {failed_count} failed") session_end = datetime.now() session_duration = (session_end - session_start).total_seconds() log.info("") log.info("=" * 70) log.info(" SESSION COMPLETE") log.info("=" * 70) log.info(f" Started: {session_start.strftime('%Y-%m-%d %H:%M:%S')}") log.info(f" Ended: {session_end.strftime('%Y-%m-%d %H:%M:%S')}") log.info(f" Duration: {format_duration(session_duration)}") log.info(f" Transcoded: {transcoded_count} files") log.info(f" Failed: {failed_count} files") log.info(f" Skipped: {skipped_count} files") log.info(f" Space saved: {format_bytes(total_saved)}") # Get updated lifetime stats stats = get_cache_stats(conn) log.info("") log.info(" --- Lifetime Totals ---") log.info(f" Total transcoded: {stats.get('total_transcoded', 0)} files") log.info(f" Total saved: {format_bytes(stats.get('total_space_saved', 0))}") log.info(f" Pending: {stats.get('pending_count', 0)} files ({format_bytes(stats.get('pending_size', 0))})") log.info("=" * 70) log.info("") log_json("session_complete", started=session_start.isoformat(), ended=session_end.isoformat(), duration_secs=session_duration, transcoded=transcoded_count, space_saved=total_saved, lifetime_transcoded=stats.get('total_transcoded', 0), lifetime_saved=stats.get('total_space_saved', 0), pending_count=stats.get('pending_count', 0)) conn.close() if __name__ == "__main__": main()