diff options
Diffstat (limited to 'music_recommender.py')
| -rwxr-xr-x | music_recommender.py | 1038 |
1 files changed, 1038 insertions, 0 deletions
diff --git a/music_recommender.py b/music_recommender.py new file mode 100755 index 0000000..a366f9a --- /dev/null +++ b/music_recommender.py @@ -0,0 +1,1038 @@ +#!/usr/bin/env python3 +""" +Music Recommender - Finds music you might like based on Last.fm history, +cross-referenced with your Navidrome collection. Can auto-download via slskd. + +Usage: + python music_recommender.py recommend # Get recommendations + python music_recommender.py gaps # Find library gaps + python music_recommender.py recent # Recent scrobbles + python music_recommender.py search "query" # Search Navidrome + python music_recommender.py surprise # Pick & download an album overnight + python music_recommender.py slskd "query" # Search Soulseek + python music_recommender.py download "artist - album" # Download from Soulseek +""" + +import argparse +import hashlib +import json +import os +import random +import re +import string +import sys +import time +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional +from urllib.parse import urlencode + +import requests + +# Default config path +CONFIG_PATH = Path(__file__).parent / "music_config.json" +STATE_PATH = Path(__file__).parent / "music_state.json" + + +class LastFM: + """Last.fm API client.""" + + BASE_URL = "https://ws.audioscrobbler.com/2.0/" + + def __init__(self, api_key: str, username: str): + self.api_key = api_key + self.username = username + + def _request(self, method: str, **params) -> dict: + params.update({ + "method": method, + "api_key": self.api_key, + "format": "json", + }) + resp = requests.get(self.BASE_URL, params=params) + resp.raise_for_status() + return resp.json() + + def get_recent_tracks(self, limit: int = 50) -> list[dict]: + """Get recently played tracks.""" + data = self._request("user.getrecenttracks", user=self.username, limit=limit) + tracks = data.get("recenttracks", {}).get("track", []) + return [ + { + "artist": t.get("artist", {}).get("#text", ""), + "album": t.get("album", {}).get("#text", ""), + "track": t.get("name", ""), + "now_playing": "@attr" in t and t["@attr"].get("nowplaying") == "true", + } + for t in tracks + ] + + def get_top_artists(self, period: str = "3month", limit: int = 50) -> list[dict]: + """Get top artists. Period: overall, 7day, 1month, 3month, 6month, 12month.""" + data = self._request("user.gettopartists", user=self.username, period=period, limit=limit) + artists = data.get("topartists", {}).get("artist", []) + return [ + { + "name": a.get("name", ""), + "playcount": int(a.get("playcount", 0)), + "url": a.get("url", ""), + } + for a in artists + ] + + def get_similar_artists(self, artist: str, limit: int = 10) -> list[dict]: + """Get artists similar to the given artist.""" + try: + data = self._request("artist.getsimilar", artist=artist, limit=limit) + similar = data.get("similarartists", {}).get("artist", []) + return [ + { + "name": a.get("name", ""), + "match": float(a.get("match", 0)), + "url": a.get("url", ""), + } + for a in similar + ] + except Exception: + return [] + + def get_artist_top_albums(self, artist: str, limit: int = 5) -> list[dict]: + """Get top albums for an artist.""" + try: + data = self._request("artist.gettopalbums", artist=artist, limit=limit) + albums = data.get("topalbums", {}).get("album", []) + return [ + { + "name": a.get("name", ""), + "artist": artist, + "playcount": int(a.get("playcount", 0)), + "url": a.get("url", ""), + } + for a in albums + if a.get("name") and a.get("name") != "(null)" + ] + except Exception: + return [] + + def get_recommendations(self, limit: int = 20) -> list[dict]: + """Get recommended artists based on listening history.""" + top = self.get_top_artists(period="3month", limit=10) + + recommendations = {} + for artist in top: + similar = self.get_similar_artists(artist["name"], limit=5) + for s in similar: + name = s["name"] + if name not in recommendations and name not in [a["name"] for a in top]: + recommendations[name] = { + "name": name, + "because_of": artist["name"], + "match": s["match"], + } + + sorted_recs = sorted(recommendations.values(), key=lambda x: x["match"], reverse=True) + return sorted_recs[:limit] + + +class Navidrome: + """Navidrome/Subsonic API client.""" + + def __init__(self, url: str, username: str, password: str): + self.base_url = url.rstrip("/") + self.username = username + self.password = password + + def _auth_params(self) -> dict: + salt = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) + token = hashlib.md5((self.password + salt).encode()).hexdigest() + return { + "u": self.username, + "t": token, + "s": salt, + "v": "1.16.1", + "c": "music_recommender", + "f": "json", + } + + def _request(self, endpoint: str, **params) -> dict: + params.update(self._auth_params()) + url = f"{self.base_url}/rest/{endpoint}" + resp = requests.get(url, params=params) + resp.raise_for_status() + data = resp.json() + + sr = data.get("subsonic-response", {}) + if sr.get("status") == "failed": + error = sr.get("error", {}) + raise Exception(f"Subsonic error {error.get('code')}: {error.get('message')}") + + return sr + + def ping(self) -> bool: + try: + self._request("ping") + return True + except Exception: + return False + + def get_artists(self) -> list[dict]: + data = self._request("getArtists") + artists = [] + for index in data.get("artists", {}).get("index", []): + for artist in index.get("artist", []): + artists.append({ + "id": artist.get("id"), + "name": artist.get("name"), + "album_count": artist.get("albumCount", 0), + }) + return artists + + def get_artist(self, artist_id: str) -> dict: + data = self._request("getArtist", id=artist_id) + artist = data.get("artist", {}) + return { + "id": artist.get("id"), + "name": artist.get("name"), + "album_count": artist.get("albumCount", 0), + "albums": [ + { + "id": a.get("id"), + "name": a.get("name"), + "year": a.get("year"), + "song_count": a.get("songCount", 0), + } + for a in artist.get("album", []) + ], + } + + def search(self, query: str) -> dict: + data = self._request("search3", query=query) + result = data.get("searchResult3", {}) + return { + "artists": [ + {"id": a.get("id"), "name": a.get("name")} + for a in result.get("artist", []) + ], + "albums": [ + {"id": a.get("id"), "name": a.get("name"), "artist": a.get("artist")} + for a in result.get("album", []) + ], + "songs": [ + {"id": s.get("id"), "title": s.get("title"), "artist": s.get("artist"), "album": s.get("album")} + for s in result.get("song", []) + ], + } + + +class Slskd: + """slskd API client for Soulseek searches and downloads.""" + + def __init__(self, url: str, api_key: str): + self.base_url = url.rstrip("/") + self.api_key = api_key + self.headers = {"X-API-Key": api_key} + + def _request(self, method: str, endpoint: str, **kwargs) -> dict: + url = f"{self.base_url}/api/v0/{endpoint}" + kwargs.setdefault("headers", {}).update(self.headers) + resp = requests.request(method, url, **kwargs) + resp.raise_for_status() + if resp.content: + return resp.json() + return {} + + def search(self, query: str, timeout: int = 30) -> str: + """Start a search and return the search ID.""" + data = self._request("POST", "searches", json={"searchText": query}) + return data.get("id") + + def get_search_state(self, search_id: str) -> dict: + """Get search state.""" + return self._request("GET", f"searches/{search_id}") + + def wait_for_search(self, search_id: str, timeout: int = 60, min_results: int = 5, min_wait: int = 30) -> bool: + """Wait for search to complete or have enough results. + + Args: + search_id: The search ID to wait for + timeout: Maximum time to wait + min_results: Return early if we have this many files + min_wait: Minimum time to wait before honoring isComplete with 0 results + """ + start = time.time() + while time.time() - start < timeout: + state = self.get_search_state(search_id) + file_count = state.get("fileCount", 0) + elapsed = time.time() - start + + # Got enough results - done + if file_count >= min_results: + return True + + # Search complete with results - done + if state.get("isComplete") and file_count > 0: + return True + + # Search complete with NO results - only trust after min_wait + # (Soulseek can mark complete before results arrive) + if state.get("isComplete") and elapsed >= min_wait: + return True + + time.sleep(2) + return False + + def get_search_responses(self, search_id: str, max_retries: int = 15, retry_delay: float = 2) -> list[dict]: + """Get search responses (results grouped by user). + + Retries if response list is empty but state shows results exist. + slskd has a race condition where /responses can lag significantly + behind state.responseCount - sometimes 10+ seconds. + """ + for attempt in range(max_retries): + responses = self._request("GET", f"searches/{search_id}/responses") + if responses: + return responses + + # Check if state says there should be responses + state = self.get_search_state(search_id) + if state.get("responseCount", 0) == 0: + # Genuinely no results + return [] + + # State has responses but endpoint returned empty - wait and retry + time.sleep(retry_delay) + + return [] + + def download_files(self, username: str, files: list[dict]) -> dict: + """Queue files for download from a user.""" + return self._request("POST", f"transfers/downloads/{username}", json=files) + + def get_downloads(self) -> list[dict]: + """Get current downloads.""" + return self._request("GET", "transfers/downloads") + + def search_and_find_album(self, artist: str, album: str, timeout: int = 60) -> Optional[dict]: + """Search for an album and return best result.""" + query = f"{artist} {album}" + print(f" Searching Soulseek for: {query}") + + search_id = self.search(query) + if not search_id: + return None + + # Wait for results + self.wait_for_search(search_id, timeout=timeout, min_results=3) + + responses = self.get_search_responses(search_id) + if not responses: + return None + + # Score and rank results + best_match = None + best_score = 0 + + for response in responses: + username = response.get("username", "") + files = response.get("files", []) + + # Skip users with no free slots + if not response.get("hasFreeUploadSlot", False): + continue + + # Group files by directory + directories = {} + for f in files: + filename = f.get("filename", "") + # Extract directory + parts = filename.replace("\\", "/").rsplit("/", 1) + if len(parts) == 2: + dir_path, fname = parts + else: + dir_path, fname = "", parts[0] + + if dir_path not in directories: + directories[dir_path] = [] + directories[dir_path].append(f) + + # Find directories that look like albums (multiple audio files) + for dir_path, dir_files in directories.items(): + audio_files = [f for f in dir_files if any( + f.get("filename", "").lower().endswith(ext) + for ext in [".flac", ".mp3", ".m4a", ".ogg", ".opus"] + )] + + if len(audio_files) < 3: + continue + + # Score this directory + score = 0 + dir_lower = dir_path.lower() + + # Prefer FLAC + flac_count = sum(1 for f in audio_files if f.get("filename", "").lower().endswith(".flac")) + if flac_count == len(audio_files): + score += 100 + elif flac_count > 0: + score += 50 + + # Check artist/album in path + if artist.lower() in dir_lower: + score += 30 + if album.lower() in dir_lower: + score += 30 + + # Prefer more complete albums + score += min(len(audio_files), 20) + + # Prefer faster users (higher upload speed) + score += min(response.get("uploadSpeed", 0) // 100000, 20) + + if score > best_score: + best_score = score + best_match = { + "username": username, + "directory": dir_path, + "files": audio_files, + "file_count": len(audio_files), + "score": score, + "has_flac": flac_count > 0, + } + + return best_match + + def download_album(self, match: dict) -> bool: + """Download an album match.""" + if not match: + return False + + username = match["username"] + files = match["files"] + + # Format files for download API + download_files = [ + {"filename": f["filename"], "size": f.get("size", 0)} + for f in files + ] + + try: + self.download_files(username, download_files) + return True + except Exception as e: + print(f" Download failed: {e}") + return False + + def get_user_downloads(self, username: str) -> list[dict]: + """Get downloads from a specific user.""" + try: + all_downloads = self._request("GET", "transfers/downloads") + # Filter by username - strip invisible Unicode chars for robust matching + clean = lambda s: ''.join(c for c in (s or '') if c.isprintable()) + target = clean(username) + return [d for d in all_downloads if clean(d.get("username", "")) == target] + except Exception: + return [] + + def cancel_user_downloads(self, username: str) -> bool: + """Cancel all downloads from a user.""" + try: + self._request("DELETE", f"transfers/downloads/{username}") + return True + except Exception: + return False + + def wait_for_download(self, username: str, expected_files: int, + timeout_minutes: int = 30, check_interval: int = 60) -> bool: + """Wait for downloads from a user to complete.""" + import time + + start = time.time() + timeout_seconds = timeout_minutes * 60 + + while time.time() - start < timeout_seconds: + downloads = self.get_user_downloads(username) + + if not downloads: + # No downloads found - might have completed and been removed + # Or never started + return False + + # Check states + completed = sum(1 for d in downloads + for f in d.get("files", []) + if f.get("state", "").startswith("Completed")) + failed = sum(1 for d in downloads + for f in d.get("files", []) + if any(s in (f.get("state") or "") for s in ["Errored", "Cancelled", "TimedOut", "Rejected", "Aborted"])) + in_progress = sum(1 for d in downloads + for f in d.get("files", []) + if any(s in (f.get("state") or "") for s in ["Queued", "Initializing", "InProgress"])) + + total = completed + failed + in_progress + + print(f" Progress: {completed}/{total} complete, {failed} failed, {in_progress} in progress") + + if completed >= expected_files: + return True + + if failed > 0 and in_progress == 0: + # All either completed or failed, nothing in progress + return completed > 0 + + time.sleep(check_interval) + + # Timeout + return False + + +class MusicRecommender: + """Main recommender combining all sources.""" + + def __init__(self, config: dict): + self.config = config + + self.lastfm = LastFM( + api_key=config["lastfm"]["api_key"], + username=config["lastfm"]["username"], + ) + + self.navidrome = None + if "navidrome" in config: + self.navidrome = Navidrome( + url=config["navidrome"]["url"], + username=config["navidrome"]["username"], + password=config["navidrome"]["password"], + ) + + self.slskd = None + if "slskd" in config: + self.slskd = Slskd( + url=config["slskd"]["url"], + api_key=config["slskd"]["api_key"], + ) + + def get_library_artists(self) -> set[str]: + if not self.navidrome: + return set() + artists = self.navidrome.get_artists() + return {a["name"].lower() for a in artists} + + def get_library_albums(self) -> set[str]: + """Get set of 'artist - album' strings in library.""" + if not self.navidrome: + return set() + + albums = set() + artists = self.navidrome.get_artists() + for artist in artists: + try: + details = self.navidrome.get_artist(artist["id"]) + for album in details.get("albums", []): + key = f"{artist['name'].lower()} - {album['name'].lower()}" + albums.add(key) + except Exception: + pass + return albums + + def find_gaps(self) -> list[dict]: + """Find artists you listen to but don't have in your library.""" + top_artists = self.lastfm.get_top_artists(period="6month", limit=50) + library = self.get_library_artists() + + # Filter out YouTube auto-channels and other noise + noise_patterns = [" - topic", "official", "vevo", "cyber gardens"] + + gaps = [] + for artist in top_artists: + name_lower = artist["name"].lower() + + # Skip noise + if any(p in name_lower for p in noise_patterns): + continue + + if name_lower not in library: + gaps.append({ + "name": artist["name"], + "playcount": artist["playcount"], + "reason": "listened to but not in library", + }) + + return gaps + + def get_disliked_artists(self) -> set[str]: + """Get set of disliked artist names (lowercase) from config feedback.""" + disliked = set() + feedback = self.config.get("feedback", {}) + for entry in feedback.get("disliked", []): + if "artist" in entry: + disliked.add(entry["artist"].lower()) + return disliked + + def get_recommendations(self) -> list[dict]: + recs = self.lastfm.get_recommendations(limit=30) + library = self.get_library_artists() + disliked = self.get_disliked_artists() + filtered = [r for r in recs if r["name"].lower() not in library and r["name"].lower() not in disliked] + return filtered[:20] + + def pick_surprise_album(self) -> Optional[dict]: + """Pick an album to download as a surprise.""" + # Load state to avoid repeats + state = self._load_state() + downloaded = set(state.get("downloaded_albums", [])) + + # Strategy 1: Album from a gap artist (artist you listen to but don't own) + gaps = self.find_gaps()[:10] + + # Strategy 2: Album from a recommended artist + recs = self.get_recommendations()[:10] + + # Combine and shuffle + candidates = [] + + for gap in gaps: + albums = self.lastfm.get_artist_top_albums(gap["name"], limit=3) + for album in albums: + key = f"{gap['name'].lower()} - {album['name'].lower()}" + if key not in downloaded: + candidates.append({ + "artist": gap["name"], + "album": album["name"], + "reason": f"You've played {gap['name']} {gap['playcount']} times but don't own this", + "priority": 2, # Higher priority for gaps + }) + + for rec in recs: + albums = self.lastfm.get_artist_top_albums(rec["name"], limit=2) + for album in albums: + key = f"{rec['name'].lower()} - {album['name'].lower()}" + if key not in downloaded: + candidates.append({ + "artist": rec["name"], + "album": album["name"], + "reason": f"Similar to {rec['because_of']}", + "priority": 1, + }) + + if not candidates: + return None + + # Sort by priority, then shuffle within priority + candidates.sort(key=lambda x: -x["priority"]) + top_priority = candidates[0]["priority"] + top_candidates = [c for c in candidates if c["priority"] == top_priority] + + return random.choice(top_candidates) + + def surprise_download(self, max_attempts: int = 3, wait_for_completion: bool = False, + timeout_minutes: int = 30) -> dict: + """Pick and download a surprise album. Tries multiple albums if needed. + + Args: + max_attempts: Maximum albums to try + wait_for_completion: If True, wait for download to complete (for overnight job) + timeout_minutes: How long to wait for each download before giving up + """ + if not self.slskd: + return {"success": False, "error": "slskd not configured"} + + # Get all candidates + state = self._load_state() + downloaded = set(state.get("downloaded_albums", [])) + disliked = self.get_disliked_artists() + + candidates = [] + + # From gaps (higher priority) - skip disliked artists + for gap in self.find_gaps()[:15]: + if gap["name"].lower() in disliked: + continue + albums = self.lastfm.get_artist_top_albums(gap["name"], limit=3) + for album in albums: + key = f"{gap['name'].lower()} - {album['name'].lower()}" + if key not in downloaded: + candidates.append({ + "artist": gap["name"], + "album": album["name"], + "reason": f"You've played {gap['name']} {gap['playcount']} times but don't own this", + "priority": 2, + }) + + # From recommendations + for rec in self.get_recommendations()[:10]: + albums = self.lastfm.get_artist_top_albums(rec["name"], limit=2) + for album in albums: + key = f"{rec['name'].lower()} - {album['name'].lower()}" + if key not in downloaded: + candidates.append({ + "artist": rec["name"], + "album": album["name"], + "reason": f"Similar to {rec['because_of']}", + "priority": 1, + }) + + # From Discogs labels (curated 90s labels) + discogs_path = Path(__file__).parent / "discogs_labels.json" + if discogs_path.exists(): + try: + with open(discogs_path) as f: + discogs_data = json.load(f) + # Pick random releases from the pool + discogs_releases = discogs_data.get("releases", []) + random.shuffle(discogs_releases) + for release in discogs_releases[:20]: + if release['artist'].lower() in disliked: + continue + key = f"{release['artist'].lower()} - {release['album'].lower()}" + if key not in downloaded: + year_str = f" ({release['year']})" if release.get('year') else "" + candidates.append({ + "artist": release["artist"], + "album": release["album"], + "reason": f"From {release['label']}{year_str}", + "priority": 1, # Same priority as recommendations + }) + except Exception: + pass # Silently skip if file is corrupt + + if not candidates: + return {"success": False, "error": "No suitable albums found"} + + # Shuffle within priority groups + random.shuffle(candidates) + candidates.sort(key=lambda x: -x["priority"]) + + # Try up to max_attempts albums + tried = [] + for pick in candidates[:max_attempts]: + print(f"\nš² Trying: {pick['artist']} - {pick['album']}") + print(f" Reason: {pick['reason']}\n") + + match = self.slskd.search_and_find_album(pick["artist"], pick["album"]) + + if not match: + print(f" ā Not found on Soulseek, trying next...") + tried.append(f"{pick['artist']} - {pick['album']} (not found)") + continue + + print(f" Found: {match['file_count']} files from {match['username']}") + print(f" FLAC: {'Yes' if match['has_flac'] else 'No'}") + + if not self.slskd.download_album(match): + tried.append(f"{pick['artist']} - {pick['album']} (queue failed)") + continue + + print(f" ā Download queued!") + + # If not waiting for completion, return success after queuing + if not wait_for_completion: + self._save_downloaded(pick["artist"], pick["album"], pick.get("reason")) + return { + "success": True, + "artist": pick["artist"], + "album": pick["album"], + "reason": pick["reason"], + "files": match["file_count"], + "source": match["username"], + "has_flac": match["has_flac"], + "attempts": len(tried) + 1, + "status": "queued", + } + + # Wait for download to complete + print(f" ā³ Waiting up to {timeout_minutes} minutes for download...") + + if self.slskd.wait_for_download(match["username"], match["file_count"], + timeout_minutes=timeout_minutes): + self._save_downloaded(pick["artist"], pick["album"], pick.get("reason")) + return { + "success": True, + "artist": pick["artist"], + "album": pick["album"], + "reason": pick["reason"], + "files": match["file_count"], + "source": match["username"], + "has_flac": match["has_flac"], + "attempts": len(tried) + 1, + "status": "completed", + } + else: + # Download timed out - cancel and try next + print(f" ā±ļø Timeout! Cancelling and trying next album...") + self.slskd.cancel_user_downloads(match["username"]) + tried.append(f"{pick['artist']} - {pick['album']} (timeout after {timeout_minutes}min)") + continue + + # Save failed attempts to wishlist for Discogs bookmarking + self._save_wishlist(tried) + + return { + "success": False, + "error": f"Tried {len(tried)} albums, none worked", + "tried": tried, + } + + def _save_wishlist(self, albums: list[str]): + """Save albums that couldn't be found to wishlist + bookmarks.""" + wishlist_path = Path(__file__).parent / "music_wishlist.md" + + # Read existing wishlist + existing = set() + if wishlist_path.exists(): + with open(wishlist_path) as f: + for line in f: + if line.startswith("- [ ] "): + existing.add(line.strip()[6:].split(" ā ")[0]) + + # Add new entries to markdown wishlist + with open(wishlist_path, "a") as f: + if not wishlist_path.exists() or wishlist_path.stat().st_size == 0: + f.write("# Music Wishlist\n\n") + f.write("Albums that couldn't be found on Soulseek.\n") + f.write("Search Discogs/Bandcamp manually or check back later.\n\n") + + for album in albums: + clean = album.replace(" (download failed)", "").replace(" (not found)", "").replace(" (queue failed)", "") + clean = clean.split(" (timeout")[0] # Remove timeout messages + if clean not in existing: + query = clean.replace(" - ", " ").replace(" ", "+") + discogs_url = f"https://www.discogs.com/search/?q={query}&type=release" + f.write(f"- [ ] {clean} ā [Discogs]({discogs_url})\n") + + # Also add to Floccus bookmarks + self._add_to_bookmarks(clean, discogs_url) + + def _add_to_bookmarks(self, title: str, url: str): + """Add album to Floccus bookmarks in Music/Wishlist folder.""" + import subprocess + + add_bookmark_script = Path(__file__).parent / "add_bookmark_to_wishlist.js" + if add_bookmark_script.exists(): + try: + subprocess.run( + ["node", str(add_bookmark_script), url, title], + capture_output=True, + timeout=30 + ) + print(f" š Added to bookmarks: {title}") + except Exception as e: + print(f" ā Bookmark add failed: {e}") + + def _load_state(self) -> dict: + if STATE_PATH.exists(): + with open(STATE_PATH) as f: + return json.load(f) + return {"downloaded_albums": []} + + def _save_downloaded(self, artist: str, album: str, reason: str = None): + state = self._load_state() + key = f"{artist.lower()} - {album.lower()}" + if key not in state["downloaded_albums"]: + state["downloaded_albums"].append(key) + # Keep last 100 + state["downloaded_albums"] = state["downloaded_albums"][-100:] + state["last_download"] = { + "artist": artist, + "album": album, + "reason": reason, + "timestamp": datetime.now().isoformat(), + } + with open(STATE_PATH, "w") as f: + json.dump(state, f, indent=2) + + def format_recommendations(self, recs: list[dict]) -> str: + lines = ["# Music Recommendations\n"] + lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n") + + for i, rec in enumerate(recs, 1): + lines.append(f"{i}. **{rec['name']}**") + if "because_of" in rec: + lines.append(f" Similar to: {rec['because_of']}") + lines.append("") + + return "\n".join(lines) + + def format_gaps(self, gaps: list[dict]) -> str: + lines = ["# Library Gaps\n"] + lines.append("Artists you listen to but don't have in Navidrome:\n") + + for i, gap in enumerate(gaps, 1): + lines.append(f"{i}. **{gap['name']}** ({gap['playcount']} plays)") + + return "\n".join(lines) + + +def load_config(path: Path) -> dict: + if not path.exists(): + print(f"Config not found: {path}") + print("Create it with your API keys. See music_config.example.json") + sys.exit(1) + + with open(path) as f: + return json.load(f) + + +def main(): + parser = argparse.ArgumentParser(description="Music recommendation tool") + parser.add_argument("command", nargs="?", default="recommend", + choices=["recommend", "gaps", "recent", "search", "test", + "surprise", "slskd", "download"], + help="Command to run") + parser.add_argument("query", nargs="?", help="Search query") + parser.add_argument("--config", type=Path, default=CONFIG_PATH, + help="Path to config file") + parser.add_argument("--json", action="store_true", help="Output as JSON") + parser.add_argument("--wait", action="store_true", + help="Wait for download to complete (for overnight jobs)") + parser.add_argument("--timeout", type=int, default=30, + help="Timeout in minutes for download completion (default: 30)") + + args = parser.parse_args() + config = load_config(args.config) + recommender = MusicRecommender(config) + + if args.command == "test": + print("Testing Last.fm...", end=" ") + try: + recent = recommender.lastfm.get_recent_tracks(limit=1) + print(f"OK (last track: {recent[0]['track'][:30] if recent else 'none'}...)") + except Exception as e: + print(f"FAILED: {e}") + + if recommender.navidrome: + print("Testing Navidrome...", end=" ") + if recommender.navidrome.ping(): + artists = recommender.navidrome.get_artists() + print(f"OK ({len(artists)} artists)") + else: + print("FAILED") + + if recommender.slskd: + print("Testing slskd...", end=" ") + try: + recommender.slskd._request("GET", "server") + print("OK") + except Exception as e: + print(f"FAILED: {e}") + + elif args.command == "recommend": + recs = recommender.get_recommendations() + if args.json: + print(json.dumps(recs, indent=2)) + else: + print(recommender.format_recommendations(recs)) + + elif args.command == "gaps": + gaps = recommender.find_gaps() + if args.json: + print(json.dumps(gaps, indent=2)) + else: + print(recommender.format_gaps(gaps)) + + elif args.command == "recent": + recent = recommender.lastfm.get_recent_tracks(limit=20) + if args.json: + print(json.dumps(recent, indent=2)) + else: + print("# Recent Tracks\n") + for t in recent: + status = "šµ NOW" if t["now_playing"] else " " + print(f"{status} {t['artist']} - {t['track']}") + + elif args.command == "search": + if not args.query: + print("Search requires a query") + sys.exit(1) + + if recommender.navidrome: + print(f"Searching Navidrome for '{args.query}'...") + results = recommender.navidrome.search(args.query) + if results["artists"]: + print("\nArtists in library:") + for a in results["artists"][:5]: + print(f" - {a['name']}") + if results["albums"]: + print("\nAlbums in library:") + for a in results["albums"][:5]: + print(f" - {a['artist']} - {a['name']}") + + elif args.command == "surprise": + result = recommender.surprise_download( + wait_for_completion=args.wait, + timeout_minutes=args.timeout, + ) + if args.json: + print(json.dumps(result, indent=2)) + else: + if result["success"]: + status = result.get("status", "queued") + print(f"\nā
{'Download complete!' if status == 'completed' else 'Queued for download!'}") + print(f" {result['artist']} - {result['album']}") + print(f" {result['files']} files {'(FLAC)' if result['has_flac'] else '(MP3)'}") + print(f" From: {result['source']}") + if result.get("attempts", 1) > 1: + print(f" (took {result['attempts']} attempts)") + else: + print(f"\nā {result['error']}") + if "tried" in result: + print(" Tried:") + for t in result["tried"]: + print(f" - {t}") + + elif args.command == "slskd": + if not args.query: + print("slskd search requires a query") + sys.exit(1) + + if not recommender.slskd: + print("slskd not configured") + sys.exit(1) + + print(f"Searching Soulseek for '{args.query}'...") + search_id = recommender.slskd.search(args.query) + recommender.slskd.wait_for_search(search_id, timeout=30) + # Brief pause to ensure /responses endpoint is populated + time.sleep(2) + responses = recommender.slskd.get_search_responses(search_id) + + print(f"\nFound {len(responses)} users with results:\n") + for resp in responses[:10]: + username = resp.get("username", "?") + files = resp.get("files", []) + has_slot = "ā" if resp.get("hasFreeUploadSlot") else "ā" + print(f" {has_slot} {username}: {len(files)} files") + + elif args.command == "download": + if not args.query: + print("download requires 'artist - album'") + sys.exit(1) + + if not recommender.slskd: + print("slskd not configured") + sys.exit(1) + + # Parse "artist - album" + if " - " in args.query: + artist, album = args.query.split(" - ", 1) + else: + print("Format: 'Artist - Album'") + sys.exit(1) + + print(f"Searching for: {artist} - {album}") + match = recommender.slskd.search_and_find_album(artist, album) + + if match: + print(f"\nBest match: {match['file_count']} files from {match['username']}") + print(f"FLAC: {'Yes' if match['has_flac'] else 'No'}") + print(f"Directory: {match['directory'][:60]}...") + + if recommender.slskd.download_album(match): + print("\nā
Download queued!") + else: + print("\nā Download failed") + else: + print("\nā No suitable results found") + + +if __name__ == "__main__": + main() |
