#!/usr/bin/env python3 """ Music Recommender - Finds music you might like based on Last.fm history, cross-referenced with your Navidrome collection. Can auto-download via slskd. Usage: python music_recommender.py recommend # Get recommendations python music_recommender.py gaps # Find library gaps python music_recommender.py recent # Recent scrobbles python music_recommender.py search "query" # Search Navidrome python music_recommender.py surprise # Pick & download an album overnight python music_recommender.py slskd "query" # Search Soulseek python music_recommender.py download "artist - album" # Download from Soulseek """ import argparse import hashlib import json import os import random import re import string import sys import time from datetime import datetime, timedelta from pathlib import Path from typing import Optional from urllib.parse import urlencode import requests # Default config path CONFIG_PATH = Path(__file__).parent / "music_config.json" STATE_PATH = Path(__file__).parent / "music_state.json" class LastFM: """Last.fm API client.""" BASE_URL = "https://ws.audioscrobbler.com/2.0/" def __init__(self, api_key: str, username: str): self.api_key = api_key self.username = username def _request(self, method: str, **params) -> dict: params.update({ "method": method, "api_key": self.api_key, "format": "json", }) resp = requests.get(self.BASE_URL, params=params) resp.raise_for_status() return resp.json() def get_recent_tracks(self, limit: int = 50) -> list[dict]: """Get recently played tracks.""" data = self._request("user.getrecenttracks", user=self.username, limit=limit) tracks = data.get("recenttracks", {}).get("track", []) return [ { "artist": t.get("artist", {}).get("#text", ""), "album": t.get("album", {}).get("#text", ""), "track": t.get("name", ""), "now_playing": "@attr" in t and t["@attr"].get("nowplaying") == "true", } for t in tracks ] def get_top_artists(self, period: str = "3month", limit: int = 50) -> list[dict]: """Get top artists. Period: overall, 7day, 1month, 3month, 6month, 12month.""" data = self._request("user.gettopartists", user=self.username, period=period, limit=limit) artists = data.get("topartists", {}).get("artist", []) return [ { "name": a.get("name", ""), "playcount": int(a.get("playcount", 0)), "url": a.get("url", ""), } for a in artists ] def get_similar_artists(self, artist: str, limit: int = 10) -> list[dict]: """Get artists similar to the given artist.""" try: data = self._request("artist.getsimilar", artist=artist, limit=limit) similar = data.get("similarartists", {}).get("artist", []) return [ { "name": a.get("name", ""), "match": float(a.get("match", 0)), "url": a.get("url", ""), } for a in similar ] except Exception: return [] def get_artist_top_albums(self, artist: str, limit: int = 5) -> list[dict]: """Get top albums for an artist.""" try: data = self._request("artist.gettopalbums", artist=artist, limit=limit) albums = data.get("topalbums", {}).get("album", []) return [ { "name": a.get("name", ""), "artist": artist, "playcount": int(a.get("playcount", 0)), "url": a.get("url", ""), } for a in albums if a.get("name") and a.get("name") != "(null)" ] except Exception: return [] def get_recommendations(self, limit: int = 20) -> list[dict]: """Get recommended artists based on listening history.""" top = self.get_top_artists(period="3month", limit=10) recommendations = {} for artist in top: similar = self.get_similar_artists(artist["name"], limit=5) for s in similar: name = s["name"] if name not in recommendations and name not in [a["name"] for a in top]: recommendations[name] = { "name": name, "because_of": artist["name"], "match": s["match"], } sorted_recs = sorted(recommendations.values(), key=lambda x: x["match"], reverse=True) return sorted_recs[:limit] class Navidrome: """Navidrome/Subsonic API client.""" def __init__(self, url: str, username: str, password: str): self.base_url = url.rstrip("/") self.username = username self.password = password def _auth_params(self) -> dict: salt = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) token = hashlib.md5((self.password + salt).encode()).hexdigest() return { "u": self.username, "t": token, "s": salt, "v": "1.16.1", "c": "music_recommender", "f": "json", } def _request(self, endpoint: str, **params) -> dict: params.update(self._auth_params()) url = f"{self.base_url}/rest/{endpoint}" resp = requests.get(url, params=params) resp.raise_for_status() data = resp.json() sr = data.get("subsonic-response", {}) if sr.get("status") == "failed": error = sr.get("error", {}) raise Exception(f"Subsonic error {error.get('code')}: {error.get('message')}") return sr def ping(self) -> bool: try: self._request("ping") return True except Exception: return False def get_artists(self) -> list[dict]: data = self._request("getArtists") artists = [] for index in data.get("artists", {}).get("index", []): for artist in index.get("artist", []): artists.append({ "id": artist.get("id"), "name": artist.get("name"), "album_count": artist.get("albumCount", 0), }) return artists def get_artist(self, artist_id: str) -> dict: data = self._request("getArtist", id=artist_id) artist = data.get("artist", {}) return { "id": artist.get("id"), "name": artist.get("name"), "album_count": artist.get("albumCount", 0), "albums": [ { "id": a.get("id"), "name": a.get("name"), "year": a.get("year"), "song_count": a.get("songCount", 0), } for a in artist.get("album", []) ], } def search(self, query: str) -> dict: data = self._request("search3", query=query) result = data.get("searchResult3", {}) return { "artists": [ {"id": a.get("id"), "name": a.get("name")} for a in result.get("artist", []) ], "albums": [ {"id": a.get("id"), "name": a.get("name"), "artist": a.get("artist")} for a in result.get("album", []) ], "songs": [ {"id": s.get("id"), "title": s.get("title"), "artist": s.get("artist"), "album": s.get("album")} for s in result.get("song", []) ], } class Slskd: """slskd API client for Soulseek searches and downloads.""" def __init__(self, url: str, api_key: str): self.base_url = url.rstrip("/") self.api_key = api_key self.headers = {"X-API-Key": api_key} def _request(self, method: str, endpoint: str, **kwargs) -> dict: url = f"{self.base_url}/api/v0/{endpoint}" kwargs.setdefault("headers", {}).update(self.headers) resp = requests.request(method, url, **kwargs) resp.raise_for_status() if resp.content: return resp.json() return {} def search(self, query: str, timeout: int = 30) -> str: """Start a search and return the search ID.""" data = self._request("POST", "searches", json={"searchText": query}) return data.get("id") def get_search_state(self, search_id: str) -> dict: """Get search state.""" return self._request("GET", f"searches/{search_id}") def wait_for_search(self, search_id: str, timeout: int = 60, min_results: int = 5, min_wait: int = 30) -> bool: """Wait for search to complete or have enough results. Args: search_id: The search ID to wait for timeout: Maximum time to wait min_results: Return early if we have this many files min_wait: Minimum time to wait before honoring isComplete with 0 results """ start = time.time() while time.time() - start < timeout: state = self.get_search_state(search_id) file_count = state.get("fileCount", 0) elapsed = time.time() - start # Got enough results - done if file_count >= min_results: return True # Search complete with results - done if state.get("isComplete") and file_count > 0: return True # Search complete with NO results - only trust after min_wait # (Soulseek can mark complete before results arrive) if state.get("isComplete") and elapsed >= min_wait: return True time.sleep(2) return False def get_search_responses(self, search_id: str, max_retries: int = 15, retry_delay: float = 2) -> list[dict]: """Get search responses (results grouped by user). Retries if response list is empty but state shows results exist. slskd has a race condition where /responses can lag significantly behind state.responseCount - sometimes 10+ seconds. """ for attempt in range(max_retries): responses = self._request("GET", f"searches/{search_id}/responses") if responses: return responses # Check if state says there should be responses state = self.get_search_state(search_id) if state.get("responseCount", 0) == 0: # Genuinely no results return [] # State has responses but endpoint returned empty - wait and retry time.sleep(retry_delay) return [] def download_files(self, username: str, files: list[dict]) -> dict: """Queue files for download from a user.""" return self._request("POST", f"transfers/downloads/{username}", json=files) def get_downloads(self) -> list[dict]: """Get current downloads.""" return self._request("GET", "transfers/downloads") def search_and_find_album(self, artist: str, album: str, timeout: int = 60) -> Optional[dict]: """Search for an album and return best result.""" query = f"{artist} {album}" print(f" Searching Soulseek for: {query}") search_id = self.search(query) if not search_id: return None # Wait for results self.wait_for_search(search_id, timeout=timeout, min_results=3) responses = self.get_search_responses(search_id) if not responses: return None # Score and rank results best_match = None best_score = 0 for response in responses: username = response.get("username", "") files = response.get("files", []) # Skip users with no free slots if not response.get("hasFreeUploadSlot", False): continue # Group files by directory directories = {} for f in files: filename = f.get("filename", "") # Extract directory parts = filename.replace("\\", "/").rsplit("/", 1) if len(parts) == 2: dir_path, fname = parts else: dir_path, fname = "", parts[0] if dir_path not in directories: directories[dir_path] = [] directories[dir_path].append(f) # Find directories that look like albums (multiple audio files) for dir_path, dir_files in directories.items(): audio_files = [f for f in dir_files if any( f.get("filename", "").lower().endswith(ext) for ext in [".flac", ".mp3", ".m4a", ".ogg", ".opus"] )] if len(audio_files) < 3: continue # Score this directory score = 0 dir_lower = dir_path.lower() # Prefer FLAC flac_count = sum(1 for f in audio_files if f.get("filename", "").lower().endswith(".flac")) if flac_count == len(audio_files): score += 100 elif flac_count > 0: score += 50 # Check artist/album in path if artist.lower() in dir_lower: score += 30 if album.lower() in dir_lower: score += 30 # Prefer more complete albums score += min(len(audio_files), 20) # Prefer faster users (higher upload speed) score += min(response.get("uploadSpeed", 0) // 100000, 20) if score > best_score: best_score = score best_match = { "username": username, "directory": dir_path, "files": audio_files, "file_count": len(audio_files), "score": score, "has_flac": flac_count > 0, } return best_match def download_album(self, match: dict) -> bool: """Download an album match.""" if not match: return False username = match["username"] files = match["files"] # Format files for download API download_files = [ {"filename": f["filename"], "size": f.get("size", 0)} for f in files ] try: self.download_files(username, download_files) return True except Exception as e: print(f" Download failed: {e}") return False def get_user_downloads(self, username: str) -> list[dict]: """Get downloads from a specific user.""" try: all_downloads = self._request("GET", "transfers/downloads") # Filter by username - strip invisible Unicode chars for robust matching clean = lambda s: ''.join(c for c in (s or '') if c.isprintable()) target = clean(username) return [d for d in all_downloads if clean(d.get("username", "")) == target] except Exception: return [] def cancel_user_downloads(self, username: str) -> bool: """Cancel all downloads from a user.""" try: self._request("DELETE", f"transfers/downloads/{username}") return True except Exception: return False def wait_for_download(self, username: str, expected_files: int, timeout_minutes: int = 30, check_interval: int = 60) -> bool: """Wait for downloads from a user to complete.""" import time start = time.time() timeout_seconds = timeout_minutes * 60 while time.time() - start < timeout_seconds: downloads = self.get_user_downloads(username) if not downloads: # No downloads found - might have completed and been removed # Or never started return False # Check states completed = sum(1 for d in downloads for f in d.get("files", []) if f.get("state", "").startswith("Completed")) failed = sum(1 for d in downloads for f in d.get("files", []) if any(s in (f.get("state") or "") for s in ["Errored", "Cancelled", "TimedOut", "Rejected", "Aborted"])) in_progress = sum(1 for d in downloads for f in d.get("files", []) if any(s in (f.get("state") or "") for s in ["Queued", "Initializing", "InProgress"])) total = completed + failed + in_progress print(f" Progress: {completed}/{total} complete, {failed} failed, {in_progress} in progress") if completed >= expected_files: return True if failed > 0 and in_progress == 0: # All either completed or failed, nothing in progress return completed > 0 time.sleep(check_interval) # Timeout return False class MusicRecommender: """Main recommender combining all sources.""" def __init__(self, config: dict): self.config = config self.lastfm = LastFM( api_key=config["lastfm"]["api_key"], username=config["lastfm"]["username"], ) self.navidrome = None if "navidrome" in config: self.navidrome = Navidrome( url=config["navidrome"]["url"], username=config["navidrome"]["username"], password=config["navidrome"]["password"], ) self.slskd = None if "slskd" in config: self.slskd = Slskd( url=config["slskd"]["url"], api_key=config["slskd"]["api_key"], ) def get_library_artists(self) -> set[str]: if not self.navidrome: return set() artists = self.navidrome.get_artists() return {a["name"].lower() for a in artists} def get_library_albums(self) -> set[str]: """Get set of 'artist - album' strings in library.""" if not self.navidrome: return set() albums = set() artists = self.navidrome.get_artists() for artist in artists: try: details = self.navidrome.get_artist(artist["id"]) for album in details.get("albums", []): key = f"{artist['name'].lower()} - {album['name'].lower()}" albums.add(key) except Exception: pass return albums def find_gaps(self) -> list[dict]: """Find artists you listen to but don't have in your library.""" top_artists = self.lastfm.get_top_artists(period="6month", limit=50) library = self.get_library_artists() # Filter out YouTube auto-channels and other noise noise_patterns = [" - topic", "official", "vevo", "cyber gardens"] gaps = [] for artist in top_artists: name_lower = artist["name"].lower() # Skip noise if any(p in name_lower for p in noise_patterns): continue if name_lower not in library: gaps.append({ "name": artist["name"], "playcount": artist["playcount"], "reason": "listened to but not in library", }) return gaps def get_disliked_artists(self) -> set[str]: """Get set of disliked artist names (lowercase) from config feedback.""" disliked = set() feedback = self.config.get("feedback", {}) for entry in feedback.get("disliked", []): if "artist" in entry: disliked.add(entry["artist"].lower()) return disliked def get_recommendations(self) -> list[dict]: recs = self.lastfm.get_recommendations(limit=30) library = self.get_library_artists() disliked = self.get_disliked_artists() filtered = [r for r in recs if r["name"].lower() not in library and r["name"].lower() not in disliked] return filtered[:20] def pick_surprise_album(self) -> Optional[dict]: """Pick an album to download as a surprise.""" # Load state to avoid repeats state = self._load_state() downloaded = set(state.get("downloaded_albums", [])) # Strategy 1: Album from a gap artist (artist you listen to but don't own) gaps = self.find_gaps()[:10] # Strategy 2: Album from a recommended artist recs = self.get_recommendations()[:10] # Combine and shuffle candidates = [] for gap in gaps: albums = self.lastfm.get_artist_top_albums(gap["name"], limit=3) for album in albums: key = f"{gap['name'].lower()} - {album['name'].lower()}" if key not in downloaded: candidates.append({ "artist": gap["name"], "album": album["name"], "reason": f"You've played {gap['name']} {gap['playcount']} times but don't own this", "priority": 2, # Higher priority for gaps }) for rec in recs: albums = self.lastfm.get_artist_top_albums(rec["name"], limit=2) for album in albums: key = f"{rec['name'].lower()} - {album['name'].lower()}" if key not in downloaded: candidates.append({ "artist": rec["name"], "album": album["name"], "reason": f"Similar to {rec['because_of']}", "priority": 1, }) if not candidates: return None # Sort by priority, then shuffle within priority candidates.sort(key=lambda x: -x["priority"]) top_priority = candidates[0]["priority"] top_candidates = [c for c in candidates if c["priority"] == top_priority] return random.choice(top_candidates) def surprise_download(self, max_attempts: int = 3, wait_for_completion: bool = False, timeout_minutes: int = 30) -> dict: """Pick and download a surprise album. Tries multiple albums if needed. Args: max_attempts: Maximum albums to try wait_for_completion: If True, wait for download to complete (for overnight job) timeout_minutes: How long to wait for each download before giving up """ if not self.slskd: return {"success": False, "error": "slskd not configured"} # Get all candidates state = self._load_state() downloaded = set(state.get("downloaded_albums", [])) disliked = self.get_disliked_artists() candidates = [] # From gaps (higher priority) - skip disliked artists for gap in self.find_gaps()[:15]: if gap["name"].lower() in disliked: continue albums = self.lastfm.get_artist_top_albums(gap["name"], limit=3) for album in albums: key = f"{gap['name'].lower()} - {album['name'].lower()}" if key not in downloaded: candidates.append({ "artist": gap["name"], "album": album["name"], "reason": f"You've played {gap['name']} {gap['playcount']} times but don't own this", "priority": 2, }) # From recommendations for rec in self.get_recommendations()[:10]: albums = self.lastfm.get_artist_top_albums(rec["name"], limit=2) for album in albums: key = f"{rec['name'].lower()} - {album['name'].lower()}" if key not in downloaded: candidates.append({ "artist": rec["name"], "album": album["name"], "reason": f"Similar to {rec['because_of']}", "priority": 1, }) # From Discogs labels (curated 90s labels) discogs_path = Path(__file__).parent / "discogs_labels.json" if discogs_path.exists(): try: with open(discogs_path) as f: discogs_data = json.load(f) # Pick random releases from the pool discogs_releases = discogs_data.get("releases", []) random.shuffle(discogs_releases) for release in discogs_releases[:20]: if release['artist'].lower() in disliked: continue key = f"{release['artist'].lower()} - {release['album'].lower()}" if key not in downloaded: year_str = f" ({release['year']})" if release.get('year') else "" candidates.append({ "artist": release["artist"], "album": release["album"], "reason": f"From {release['label']}{year_str}", "priority": 1, # Same priority as recommendations }) except Exception: pass # Silently skip if file is corrupt if not candidates: return {"success": False, "error": "No suitable albums found"} # Shuffle within priority groups random.shuffle(candidates) candidates.sort(key=lambda x: -x["priority"]) # Try up to max_attempts albums tried = [] for pick in candidates[:max_attempts]: print(f"\nšŸŽ² Trying: {pick['artist']} - {pick['album']}") print(f" Reason: {pick['reason']}\n") match = self.slskd.search_and_find_album(pick["artist"], pick["album"]) if not match: print(f" āŒ Not found on Soulseek, trying next...") tried.append(f"{pick['artist']} - {pick['album']} (not found)") continue print(f" Found: {match['file_count']} files from {match['username']}") print(f" FLAC: {'Yes' if match['has_flac'] else 'No'}") if not self.slskd.download_album(match): tried.append(f"{pick['artist']} - {pick['album']} (queue failed)") continue print(f" āœ“ Download queued!") # If not waiting for completion, return success after queuing if not wait_for_completion: self._save_downloaded(pick["artist"], pick["album"], pick.get("reason")) return { "success": True, "artist": pick["artist"], "album": pick["album"], "reason": pick["reason"], "files": match["file_count"], "source": match["username"], "has_flac": match["has_flac"], "attempts": len(tried) + 1, "status": "queued", } # Wait for download to complete print(f" ā³ Waiting up to {timeout_minutes} minutes for download...") if self.slskd.wait_for_download(match["username"], match["file_count"], timeout_minutes=timeout_minutes): self._save_downloaded(pick["artist"], pick["album"], pick.get("reason")) return { "success": True, "artist": pick["artist"], "album": pick["album"], "reason": pick["reason"], "files": match["file_count"], "source": match["username"], "has_flac": match["has_flac"], "attempts": len(tried) + 1, "status": "completed", } else: # Download timed out - cancel and try next print(f" ā±ļø Timeout! Cancelling and trying next album...") self.slskd.cancel_user_downloads(match["username"]) tried.append(f"{pick['artist']} - {pick['album']} (timeout after {timeout_minutes}min)") continue # Save failed attempts to wishlist for Discogs bookmarking self._save_wishlist(tried) return { "success": False, "error": f"Tried {len(tried)} albums, none worked", "tried": tried, } def _save_wishlist(self, albums: list[str]): """Save albums that couldn't be found to wishlist + bookmarks.""" wishlist_path = Path(__file__).parent / "music_wishlist.md" # Read existing wishlist existing = set() if wishlist_path.exists(): with open(wishlist_path) as f: for line in f: if line.startswith("- [ ] "): existing.add(line.strip()[6:].split(" — ")[0]) # Add new entries to markdown wishlist with open(wishlist_path, "a") as f: if not wishlist_path.exists() or wishlist_path.stat().st_size == 0: f.write("# Music Wishlist\n\n") f.write("Albums that couldn't be found on Soulseek.\n") f.write("Search Discogs/Bandcamp manually or check back later.\n\n") for album in albums: clean = album.replace(" (download failed)", "").replace(" (not found)", "").replace(" (queue failed)", "") clean = clean.split(" (timeout")[0] # Remove timeout messages if clean not in existing: query = clean.replace(" - ", " ").replace(" ", "+") discogs_url = f"https://www.discogs.com/search/?q={query}&type=release" f.write(f"- [ ] {clean} — [Discogs]({discogs_url})\n") # Also add to Floccus bookmarks self._add_to_bookmarks(clean, discogs_url) def _add_to_bookmarks(self, title: str, url: str): """Add album to Floccus bookmarks in Music/Wishlist folder.""" import subprocess add_bookmark_script = Path(__file__).parent / "add_bookmark_to_wishlist.js" if add_bookmark_script.exists(): try: subprocess.run( ["node", str(add_bookmark_script), url, title], capture_output=True, timeout=30 ) print(f" šŸ“Œ Added to bookmarks: {title}") except Exception as e: print(f" ⚠ Bookmark add failed: {e}") def _load_state(self) -> dict: if STATE_PATH.exists(): with open(STATE_PATH) as f: return json.load(f) return {"downloaded_albums": []} def _save_downloaded(self, artist: str, album: str, reason: str = None): state = self._load_state() key = f"{artist.lower()} - {album.lower()}" if key not in state["downloaded_albums"]: state["downloaded_albums"].append(key) # Keep last 100 state["downloaded_albums"] = state["downloaded_albums"][-100:] state["last_download"] = { "artist": artist, "album": album, "reason": reason, "timestamp": datetime.now().isoformat(), } with open(STATE_PATH, "w") as f: json.dump(state, f, indent=2) def format_recommendations(self, recs: list[dict]) -> str: lines = ["# Music Recommendations\n"] lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n") for i, rec in enumerate(recs, 1): lines.append(f"{i}. **{rec['name']}**") if "because_of" in rec: lines.append(f" Similar to: {rec['because_of']}") lines.append("") return "\n".join(lines) def format_gaps(self, gaps: list[dict]) -> str: lines = ["# Library Gaps\n"] lines.append("Artists you listen to but don't have in Navidrome:\n") for i, gap in enumerate(gaps, 1): lines.append(f"{i}. **{gap['name']}** ({gap['playcount']} plays)") return "\n".join(lines) def load_config(path: Path) -> dict: if not path.exists(): print(f"Config not found: {path}") print("Create it with your API keys. See music_config.example.json") sys.exit(1) with open(path) as f: return json.load(f) def main(): parser = argparse.ArgumentParser(description="Music recommendation tool") parser.add_argument("command", nargs="?", default="recommend", choices=["recommend", "gaps", "recent", "search", "test", "surprise", "slskd", "download"], help="Command to run") parser.add_argument("query", nargs="?", help="Search query") parser.add_argument("--config", type=Path, default=CONFIG_PATH, help="Path to config file") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--wait", action="store_true", help="Wait for download to complete (for overnight jobs)") parser.add_argument("--timeout", type=int, default=30, help="Timeout in minutes for download completion (default: 30)") args = parser.parse_args() config = load_config(args.config) recommender = MusicRecommender(config) if args.command == "test": print("Testing Last.fm...", end=" ") try: recent = recommender.lastfm.get_recent_tracks(limit=1) print(f"OK (last track: {recent[0]['track'][:30] if recent else 'none'}...)") except Exception as e: print(f"FAILED: {e}") if recommender.navidrome: print("Testing Navidrome...", end=" ") if recommender.navidrome.ping(): artists = recommender.navidrome.get_artists() print(f"OK ({len(artists)} artists)") else: print("FAILED") if recommender.slskd: print("Testing slskd...", end=" ") try: recommender.slskd._request("GET", "server") print("OK") except Exception as e: print(f"FAILED: {e}") elif args.command == "recommend": recs = recommender.get_recommendations() if args.json: print(json.dumps(recs, indent=2)) else: print(recommender.format_recommendations(recs)) elif args.command == "gaps": gaps = recommender.find_gaps() if args.json: print(json.dumps(gaps, indent=2)) else: print(recommender.format_gaps(gaps)) elif args.command == "recent": recent = recommender.lastfm.get_recent_tracks(limit=20) if args.json: print(json.dumps(recent, indent=2)) else: print("# Recent Tracks\n") for t in recent: status = "šŸŽµ NOW" if t["now_playing"] else " " print(f"{status} {t['artist']} - {t['track']}") elif args.command == "search": if not args.query: print("Search requires a query") sys.exit(1) if recommender.navidrome: print(f"Searching Navidrome for '{args.query}'...") results = recommender.navidrome.search(args.query) if results["artists"]: print("\nArtists in library:") for a in results["artists"][:5]: print(f" - {a['name']}") if results["albums"]: print("\nAlbums in library:") for a in results["albums"][:5]: print(f" - {a['artist']} - {a['name']}") elif args.command == "surprise": result = recommender.surprise_download( wait_for_completion=args.wait, timeout_minutes=args.timeout, ) if args.json: print(json.dumps(result, indent=2)) else: if result["success"]: status = result.get("status", "queued") print(f"\nāœ… {'Download complete!' if status == 'completed' else 'Queued for download!'}") print(f" {result['artist']} - {result['album']}") print(f" {result['files']} files {'(FLAC)' if result['has_flac'] else '(MP3)'}") print(f" From: {result['source']}") if result.get("attempts", 1) > 1: print(f" (took {result['attempts']} attempts)") else: print(f"\nāŒ {result['error']}") if "tried" in result: print(" Tried:") for t in result["tried"]: print(f" - {t}") elif args.command == "slskd": if not args.query: print("slskd search requires a query") sys.exit(1) if not recommender.slskd: print("slskd not configured") sys.exit(1) print(f"Searching Soulseek for '{args.query}'...") search_id = recommender.slskd.search(args.query) recommender.slskd.wait_for_search(search_id, timeout=30) # Brief pause to ensure /responses endpoint is populated time.sleep(2) responses = recommender.slskd.get_search_responses(search_id) print(f"\nFound {len(responses)} users with results:\n") for resp in responses[:10]: username = resp.get("username", "?") files = resp.get("files", []) has_slot = "āœ“" if resp.get("hasFreeUploadSlot") else "āœ—" print(f" {has_slot} {username}: {len(files)} files") elif args.command == "download": if not args.query: print("download requires 'artist - album'") sys.exit(1) if not recommender.slskd: print("slskd not configured") sys.exit(1) # Parse "artist - album" if " - " in args.query: artist, album = args.query.split(" - ", 1) else: print("Format: 'Artist - Album'") sys.exit(1) print(f"Searching for: {artist} - {album}") match = recommender.slskd.search_and_find_album(artist, album) if match: print(f"\nBest match: {match['file_count']} files from {match['username']}") print(f"FLAC: {'Yes' if match['has_flac'] else 'No'}") print(f"Directory: {match['directory'][:60]}...") if recommender.slskd.download_album(match): print("\nāœ… Download queued!") else: print("\nāŒ Download failed") else: print("\nāŒ No suitable results found") if __name__ == "__main__": main()