summaryrefslogtreecommitdiff
path: root/music_recommender.py
diff options
context:
space:
mode:
Diffstat (limited to 'music_recommender.py')
-rwxr-xr-xmusic_recommender.py1038
1 files changed, 1038 insertions, 0 deletions
diff --git a/music_recommender.py b/music_recommender.py
new file mode 100755
index 0000000..a366f9a
--- /dev/null
+++ b/music_recommender.py
@@ -0,0 +1,1038 @@
+#!/usr/bin/env python3
+"""
+Music Recommender - Finds music you might like based on Last.fm history,
+cross-referenced with your Navidrome collection. Can auto-download via slskd.
+
+Usage:
+ python music_recommender.py recommend # Get recommendations
+ python music_recommender.py gaps # Find library gaps
+ python music_recommender.py recent # Recent scrobbles
+ python music_recommender.py search "query" # Search Navidrome
+ python music_recommender.py surprise # Pick & download an album overnight
+ python music_recommender.py slskd "query" # Search Soulseek
+ python music_recommender.py download "artist - album" # Download from Soulseek
+"""
+
+import argparse
+import hashlib
+import json
+import os
+import random
+import re
+import string
+import sys
+import time
+from datetime import datetime, timedelta
+from pathlib import Path
+from typing import Optional
+from urllib.parse import urlencode
+
+import requests
+
+# Default config path
+CONFIG_PATH = Path(__file__).parent / "music_config.json"
+STATE_PATH = Path(__file__).parent / "music_state.json"
+
+
+class LastFM:
+ """Last.fm API client."""
+
+ BASE_URL = "https://ws.audioscrobbler.com/2.0/"
+
+ def __init__(self, api_key: str, username: str):
+ self.api_key = api_key
+ self.username = username
+
+ def _request(self, method: str, **params) -> dict:
+ params.update({
+ "method": method,
+ "api_key": self.api_key,
+ "format": "json",
+ })
+ resp = requests.get(self.BASE_URL, params=params)
+ resp.raise_for_status()
+ return resp.json()
+
+ def get_recent_tracks(self, limit: int = 50) -> list[dict]:
+ """Get recently played tracks."""
+ data = self._request("user.getrecenttracks", user=self.username, limit=limit)
+ tracks = data.get("recenttracks", {}).get("track", [])
+ return [
+ {
+ "artist": t.get("artist", {}).get("#text", ""),
+ "album": t.get("album", {}).get("#text", ""),
+ "track": t.get("name", ""),
+ "now_playing": "@attr" in t and t["@attr"].get("nowplaying") == "true",
+ }
+ for t in tracks
+ ]
+
+ def get_top_artists(self, period: str = "3month", limit: int = 50) -> list[dict]:
+ """Get top artists. Period: overall, 7day, 1month, 3month, 6month, 12month."""
+ data = self._request("user.gettopartists", user=self.username, period=period, limit=limit)
+ artists = data.get("topartists", {}).get("artist", [])
+ return [
+ {
+ "name": a.get("name", ""),
+ "playcount": int(a.get("playcount", 0)),
+ "url": a.get("url", ""),
+ }
+ for a in artists
+ ]
+
+ def get_similar_artists(self, artist: str, limit: int = 10) -> list[dict]:
+ """Get artists similar to the given artist."""
+ try:
+ data = self._request("artist.getsimilar", artist=artist, limit=limit)
+ similar = data.get("similarartists", {}).get("artist", [])
+ return [
+ {
+ "name": a.get("name", ""),
+ "match": float(a.get("match", 0)),
+ "url": a.get("url", ""),
+ }
+ for a in similar
+ ]
+ except Exception:
+ return []
+
+ def get_artist_top_albums(self, artist: str, limit: int = 5) -> list[dict]:
+ """Get top albums for an artist."""
+ try:
+ data = self._request("artist.gettopalbums", artist=artist, limit=limit)
+ albums = data.get("topalbums", {}).get("album", [])
+ return [
+ {
+ "name": a.get("name", ""),
+ "artist": artist,
+ "playcount": int(a.get("playcount", 0)),
+ "url": a.get("url", ""),
+ }
+ for a in albums
+ if a.get("name") and a.get("name") != "(null)"
+ ]
+ except Exception:
+ return []
+
+ def get_recommendations(self, limit: int = 20) -> list[dict]:
+ """Get recommended artists based on listening history."""
+ top = self.get_top_artists(period="3month", limit=10)
+
+ recommendations = {}
+ for artist in top:
+ similar = self.get_similar_artists(artist["name"], limit=5)
+ for s in similar:
+ name = s["name"]
+ if name not in recommendations and name not in [a["name"] for a in top]:
+ recommendations[name] = {
+ "name": name,
+ "because_of": artist["name"],
+ "match": s["match"],
+ }
+
+ sorted_recs = sorted(recommendations.values(), key=lambda x: x["match"], reverse=True)
+ return sorted_recs[:limit]
+
+
+class Navidrome:
+ """Navidrome/Subsonic API client."""
+
+ def __init__(self, url: str, username: str, password: str):
+ self.base_url = url.rstrip("/")
+ self.username = username
+ self.password = password
+
+ def _auth_params(self) -> dict:
+ salt = "".join(random.choices(string.ascii_lowercase + string.digits, k=8))
+ token = hashlib.md5((self.password + salt).encode()).hexdigest()
+ return {
+ "u": self.username,
+ "t": token,
+ "s": salt,
+ "v": "1.16.1",
+ "c": "music_recommender",
+ "f": "json",
+ }
+
+ def _request(self, endpoint: str, **params) -> dict:
+ params.update(self._auth_params())
+ url = f"{self.base_url}/rest/{endpoint}"
+ resp = requests.get(url, params=params)
+ resp.raise_for_status()
+ data = resp.json()
+
+ sr = data.get("subsonic-response", {})
+ if sr.get("status") == "failed":
+ error = sr.get("error", {})
+ raise Exception(f"Subsonic error {error.get('code')}: {error.get('message')}")
+
+ return sr
+
+ def ping(self) -> bool:
+ try:
+ self._request("ping")
+ return True
+ except Exception:
+ return False
+
+ def get_artists(self) -> list[dict]:
+ data = self._request("getArtists")
+ artists = []
+ for index in data.get("artists", {}).get("index", []):
+ for artist in index.get("artist", []):
+ artists.append({
+ "id": artist.get("id"),
+ "name": artist.get("name"),
+ "album_count": artist.get("albumCount", 0),
+ })
+ return artists
+
+ def get_artist(self, artist_id: str) -> dict:
+ data = self._request("getArtist", id=artist_id)
+ artist = data.get("artist", {})
+ return {
+ "id": artist.get("id"),
+ "name": artist.get("name"),
+ "album_count": artist.get("albumCount", 0),
+ "albums": [
+ {
+ "id": a.get("id"),
+ "name": a.get("name"),
+ "year": a.get("year"),
+ "song_count": a.get("songCount", 0),
+ }
+ for a in artist.get("album", [])
+ ],
+ }
+
+ def search(self, query: str) -> dict:
+ data = self._request("search3", query=query)
+ result = data.get("searchResult3", {})
+ return {
+ "artists": [
+ {"id": a.get("id"), "name": a.get("name")}
+ for a in result.get("artist", [])
+ ],
+ "albums": [
+ {"id": a.get("id"), "name": a.get("name"), "artist": a.get("artist")}
+ for a in result.get("album", [])
+ ],
+ "songs": [
+ {"id": s.get("id"), "title": s.get("title"), "artist": s.get("artist"), "album": s.get("album")}
+ for s in result.get("song", [])
+ ],
+ }
+
+
+class Slskd:
+ """slskd API client for Soulseek searches and downloads."""
+
+ def __init__(self, url: str, api_key: str):
+ self.base_url = url.rstrip("/")
+ self.api_key = api_key
+ self.headers = {"X-API-Key": api_key}
+
+ def _request(self, method: str, endpoint: str, **kwargs) -> dict:
+ url = f"{self.base_url}/api/v0/{endpoint}"
+ kwargs.setdefault("headers", {}).update(self.headers)
+ resp = requests.request(method, url, **kwargs)
+ resp.raise_for_status()
+ if resp.content:
+ return resp.json()
+ return {}
+
+ def search(self, query: str, timeout: int = 30) -> str:
+ """Start a search and return the search ID."""
+ data = self._request("POST", "searches", json={"searchText": query})
+ return data.get("id")
+
+ def get_search_state(self, search_id: str) -> dict:
+ """Get search state."""
+ return self._request("GET", f"searches/{search_id}")
+
+ def wait_for_search(self, search_id: str, timeout: int = 60, min_results: int = 5, min_wait: int = 30) -> bool:
+ """Wait for search to complete or have enough results.
+
+ Args:
+ search_id: The search ID to wait for
+ timeout: Maximum time to wait
+ min_results: Return early if we have this many files
+ min_wait: Minimum time to wait before honoring isComplete with 0 results
+ """
+ start = time.time()
+ while time.time() - start < timeout:
+ state = self.get_search_state(search_id)
+ file_count = state.get("fileCount", 0)
+ elapsed = time.time() - start
+
+ # Got enough results - done
+ if file_count >= min_results:
+ return True
+
+ # Search complete with results - done
+ if state.get("isComplete") and file_count > 0:
+ return True
+
+ # Search complete with NO results - only trust after min_wait
+ # (Soulseek can mark complete before results arrive)
+ if state.get("isComplete") and elapsed >= min_wait:
+ return True
+
+ time.sleep(2)
+ return False
+
+ def get_search_responses(self, search_id: str, max_retries: int = 15, retry_delay: float = 2) -> list[dict]:
+ """Get search responses (results grouped by user).
+
+ Retries if response list is empty but state shows results exist.
+ slskd has a race condition where /responses can lag significantly
+ behind state.responseCount - sometimes 10+ seconds.
+ """
+ for attempt in range(max_retries):
+ responses = self._request("GET", f"searches/{search_id}/responses")
+ if responses:
+ return responses
+
+ # Check if state says there should be responses
+ state = self.get_search_state(search_id)
+ if state.get("responseCount", 0) == 0:
+ # Genuinely no results
+ return []
+
+ # State has responses but endpoint returned empty - wait and retry
+ time.sleep(retry_delay)
+
+ return []
+
+ def download_files(self, username: str, files: list[dict]) -> dict:
+ """Queue files for download from a user."""
+ return self._request("POST", f"transfers/downloads/{username}", json=files)
+
+ def get_downloads(self) -> list[dict]:
+ """Get current downloads."""
+ return self._request("GET", "transfers/downloads")
+
+ def search_and_find_album(self, artist: str, album: str, timeout: int = 60) -> Optional[dict]:
+ """Search for an album and return best result."""
+ query = f"{artist} {album}"
+ print(f" Searching Soulseek for: {query}")
+
+ search_id = self.search(query)
+ if not search_id:
+ return None
+
+ # Wait for results
+ self.wait_for_search(search_id, timeout=timeout, min_results=3)
+
+ responses = self.get_search_responses(search_id)
+ if not responses:
+ return None
+
+ # Score and rank results
+ best_match = None
+ best_score = 0
+
+ for response in responses:
+ username = response.get("username", "")
+ files = response.get("files", [])
+
+ # Skip users with no free slots
+ if not response.get("hasFreeUploadSlot", False):
+ continue
+
+ # Group files by directory
+ directories = {}
+ for f in files:
+ filename = f.get("filename", "")
+ # Extract directory
+ parts = filename.replace("\\", "/").rsplit("/", 1)
+ if len(parts) == 2:
+ dir_path, fname = parts
+ else:
+ dir_path, fname = "", parts[0]
+
+ if dir_path not in directories:
+ directories[dir_path] = []
+ directories[dir_path].append(f)
+
+ # Find directories that look like albums (multiple audio files)
+ for dir_path, dir_files in directories.items():
+ audio_files = [f for f in dir_files if any(
+ f.get("filename", "").lower().endswith(ext)
+ for ext in [".flac", ".mp3", ".m4a", ".ogg", ".opus"]
+ )]
+
+ if len(audio_files) < 3:
+ continue
+
+ # Score this directory
+ score = 0
+ dir_lower = dir_path.lower()
+
+ # Prefer FLAC
+ flac_count = sum(1 for f in audio_files if f.get("filename", "").lower().endswith(".flac"))
+ if flac_count == len(audio_files):
+ score += 100
+ elif flac_count > 0:
+ score += 50
+
+ # Check artist/album in path
+ if artist.lower() in dir_lower:
+ score += 30
+ if album.lower() in dir_lower:
+ score += 30
+
+ # Prefer more complete albums
+ score += min(len(audio_files), 20)
+
+ # Prefer faster users (higher upload speed)
+ score += min(response.get("uploadSpeed", 0) // 100000, 20)
+
+ if score > best_score:
+ best_score = score
+ best_match = {
+ "username": username,
+ "directory": dir_path,
+ "files": audio_files,
+ "file_count": len(audio_files),
+ "score": score,
+ "has_flac": flac_count > 0,
+ }
+
+ return best_match
+
+ def download_album(self, match: dict) -> bool:
+ """Download an album match."""
+ if not match:
+ return False
+
+ username = match["username"]
+ files = match["files"]
+
+ # Format files for download API
+ download_files = [
+ {"filename": f["filename"], "size": f.get("size", 0)}
+ for f in files
+ ]
+
+ try:
+ self.download_files(username, download_files)
+ return True
+ except Exception as e:
+ print(f" Download failed: {e}")
+ return False
+
+ def get_user_downloads(self, username: str) -> list[dict]:
+ """Get downloads from a specific user."""
+ try:
+ all_downloads = self._request("GET", "transfers/downloads")
+ # Filter by username - strip invisible Unicode chars for robust matching
+ clean = lambda s: ''.join(c for c in (s or '') if c.isprintable())
+ target = clean(username)
+ return [d for d in all_downloads if clean(d.get("username", "")) == target]
+ except Exception:
+ return []
+
+ def cancel_user_downloads(self, username: str) -> bool:
+ """Cancel all downloads from a user."""
+ try:
+ self._request("DELETE", f"transfers/downloads/{username}")
+ return True
+ except Exception:
+ return False
+
+ def wait_for_download(self, username: str, expected_files: int,
+ timeout_minutes: int = 30, check_interval: int = 60) -> bool:
+ """Wait for downloads from a user to complete."""
+ import time
+
+ start = time.time()
+ timeout_seconds = timeout_minutes * 60
+
+ while time.time() - start < timeout_seconds:
+ downloads = self.get_user_downloads(username)
+
+ if not downloads:
+ # No downloads found - might have completed and been removed
+ # Or never started
+ return False
+
+ # Check states
+ completed = sum(1 for d in downloads
+ for f in d.get("files", [])
+ if f.get("state", "").startswith("Completed"))
+ failed = sum(1 for d in downloads
+ for f in d.get("files", [])
+ if any(s in (f.get("state") or "") for s in ["Errored", "Cancelled", "TimedOut", "Rejected", "Aborted"]))
+ in_progress = sum(1 for d in downloads
+ for f in d.get("files", [])
+ if any(s in (f.get("state") or "") for s in ["Queued", "Initializing", "InProgress"]))
+
+ total = completed + failed + in_progress
+
+ print(f" Progress: {completed}/{total} complete, {failed} failed, {in_progress} in progress")
+
+ if completed >= expected_files:
+ return True
+
+ if failed > 0 and in_progress == 0:
+ # All either completed or failed, nothing in progress
+ return completed > 0
+
+ time.sleep(check_interval)
+
+ # Timeout
+ return False
+
+
+class MusicRecommender:
+ """Main recommender combining all sources."""
+
+ def __init__(self, config: dict):
+ self.config = config
+
+ self.lastfm = LastFM(
+ api_key=config["lastfm"]["api_key"],
+ username=config["lastfm"]["username"],
+ )
+
+ self.navidrome = None
+ if "navidrome" in config:
+ self.navidrome = Navidrome(
+ url=config["navidrome"]["url"],
+ username=config["navidrome"]["username"],
+ password=config["navidrome"]["password"],
+ )
+
+ self.slskd = None
+ if "slskd" in config:
+ self.slskd = Slskd(
+ url=config["slskd"]["url"],
+ api_key=config["slskd"]["api_key"],
+ )
+
+ def get_library_artists(self) -> set[str]:
+ if not self.navidrome:
+ return set()
+ artists = self.navidrome.get_artists()
+ return {a["name"].lower() for a in artists}
+
+ def get_library_albums(self) -> set[str]:
+ """Get set of 'artist - album' strings in library."""
+ if not self.navidrome:
+ return set()
+
+ albums = set()
+ artists = self.navidrome.get_artists()
+ for artist in artists:
+ try:
+ details = self.navidrome.get_artist(artist["id"])
+ for album in details.get("albums", []):
+ key = f"{artist['name'].lower()} - {album['name'].lower()}"
+ albums.add(key)
+ except Exception:
+ pass
+ return albums
+
+ def find_gaps(self) -> list[dict]:
+ """Find artists you listen to but don't have in your library."""
+ top_artists = self.lastfm.get_top_artists(period="6month", limit=50)
+ library = self.get_library_artists()
+
+ # Filter out YouTube auto-channels and other noise
+ noise_patterns = [" - topic", "official", "vevo", "cyber gardens"]
+
+ gaps = []
+ for artist in top_artists:
+ name_lower = artist["name"].lower()
+
+ # Skip noise
+ if any(p in name_lower for p in noise_patterns):
+ continue
+
+ if name_lower not in library:
+ gaps.append({
+ "name": artist["name"],
+ "playcount": artist["playcount"],
+ "reason": "listened to but not in library",
+ })
+
+ return gaps
+
+ def get_disliked_artists(self) -> set[str]:
+ """Get set of disliked artist names (lowercase) from config feedback."""
+ disliked = set()
+ feedback = self.config.get("feedback", {})
+ for entry in feedback.get("disliked", []):
+ if "artist" in entry:
+ disliked.add(entry["artist"].lower())
+ return disliked
+
+ def get_recommendations(self) -> list[dict]:
+ recs = self.lastfm.get_recommendations(limit=30)
+ library = self.get_library_artists()
+ disliked = self.get_disliked_artists()
+ filtered = [r for r in recs if r["name"].lower() not in library and r["name"].lower() not in disliked]
+ return filtered[:20]
+
+ def pick_surprise_album(self) -> Optional[dict]:
+ """Pick an album to download as a surprise."""
+ # Load state to avoid repeats
+ state = self._load_state()
+ downloaded = set(state.get("downloaded_albums", []))
+
+ # Strategy 1: Album from a gap artist (artist you listen to but don't own)
+ gaps = self.find_gaps()[:10]
+
+ # Strategy 2: Album from a recommended artist
+ recs = self.get_recommendations()[:10]
+
+ # Combine and shuffle
+ candidates = []
+
+ for gap in gaps:
+ albums = self.lastfm.get_artist_top_albums(gap["name"], limit=3)
+ for album in albums:
+ key = f"{gap['name'].lower()} - {album['name'].lower()}"
+ if key not in downloaded:
+ candidates.append({
+ "artist": gap["name"],
+ "album": album["name"],
+ "reason": f"You've played {gap['name']} {gap['playcount']} times but don't own this",
+ "priority": 2, # Higher priority for gaps
+ })
+
+ for rec in recs:
+ albums = self.lastfm.get_artist_top_albums(rec["name"], limit=2)
+ for album in albums:
+ key = f"{rec['name'].lower()} - {album['name'].lower()}"
+ if key not in downloaded:
+ candidates.append({
+ "artist": rec["name"],
+ "album": album["name"],
+ "reason": f"Similar to {rec['because_of']}",
+ "priority": 1,
+ })
+
+ if not candidates:
+ return None
+
+ # Sort by priority, then shuffle within priority
+ candidates.sort(key=lambda x: -x["priority"])
+ top_priority = candidates[0]["priority"]
+ top_candidates = [c for c in candidates if c["priority"] == top_priority]
+
+ return random.choice(top_candidates)
+
+ def surprise_download(self, max_attempts: int = 3, wait_for_completion: bool = False,
+ timeout_minutes: int = 30) -> dict:
+ """Pick and download a surprise album. Tries multiple albums if needed.
+
+ Args:
+ max_attempts: Maximum albums to try
+ wait_for_completion: If True, wait for download to complete (for overnight job)
+ timeout_minutes: How long to wait for each download before giving up
+ """
+ if not self.slskd:
+ return {"success": False, "error": "slskd not configured"}
+
+ # Get all candidates
+ state = self._load_state()
+ downloaded = set(state.get("downloaded_albums", []))
+ disliked = self.get_disliked_artists()
+
+ candidates = []
+
+ # From gaps (higher priority) - skip disliked artists
+ for gap in self.find_gaps()[:15]:
+ if gap["name"].lower() in disliked:
+ continue
+ albums = self.lastfm.get_artist_top_albums(gap["name"], limit=3)
+ for album in albums:
+ key = f"{gap['name'].lower()} - {album['name'].lower()}"
+ if key not in downloaded:
+ candidates.append({
+ "artist": gap["name"],
+ "album": album["name"],
+ "reason": f"You've played {gap['name']} {gap['playcount']} times but don't own this",
+ "priority": 2,
+ })
+
+ # From recommendations
+ for rec in self.get_recommendations()[:10]:
+ albums = self.lastfm.get_artist_top_albums(rec["name"], limit=2)
+ for album in albums:
+ key = f"{rec['name'].lower()} - {album['name'].lower()}"
+ if key not in downloaded:
+ candidates.append({
+ "artist": rec["name"],
+ "album": album["name"],
+ "reason": f"Similar to {rec['because_of']}",
+ "priority": 1,
+ })
+
+ # From Discogs labels (curated 90s labels)
+ discogs_path = Path(__file__).parent / "discogs_labels.json"
+ if discogs_path.exists():
+ try:
+ with open(discogs_path) as f:
+ discogs_data = json.load(f)
+ # Pick random releases from the pool
+ discogs_releases = discogs_data.get("releases", [])
+ random.shuffle(discogs_releases)
+ for release in discogs_releases[:20]:
+ if release['artist'].lower() in disliked:
+ continue
+ key = f"{release['artist'].lower()} - {release['album'].lower()}"
+ if key not in downloaded:
+ year_str = f" ({release['year']})" if release.get('year') else ""
+ candidates.append({
+ "artist": release["artist"],
+ "album": release["album"],
+ "reason": f"From {release['label']}{year_str}",
+ "priority": 1, # Same priority as recommendations
+ })
+ except Exception:
+ pass # Silently skip if file is corrupt
+
+ if not candidates:
+ return {"success": False, "error": "No suitable albums found"}
+
+ # Shuffle within priority groups
+ random.shuffle(candidates)
+ candidates.sort(key=lambda x: -x["priority"])
+
+ # Try up to max_attempts albums
+ tried = []
+ for pick in candidates[:max_attempts]:
+ print(f"\nšŸŽ² Trying: {pick['artist']} - {pick['album']}")
+ print(f" Reason: {pick['reason']}\n")
+
+ match = self.slskd.search_and_find_album(pick["artist"], pick["album"])
+
+ if not match:
+ print(f" āŒ Not found on Soulseek, trying next...")
+ tried.append(f"{pick['artist']} - {pick['album']} (not found)")
+ continue
+
+ print(f" Found: {match['file_count']} files from {match['username']}")
+ print(f" FLAC: {'Yes' if match['has_flac'] else 'No'}")
+
+ if not self.slskd.download_album(match):
+ tried.append(f"{pick['artist']} - {pick['album']} (queue failed)")
+ continue
+
+ print(f" āœ“ Download queued!")
+
+ # If not waiting for completion, return success after queuing
+ if not wait_for_completion:
+ self._save_downloaded(pick["artist"], pick["album"], pick.get("reason"))
+ return {
+ "success": True,
+ "artist": pick["artist"],
+ "album": pick["album"],
+ "reason": pick["reason"],
+ "files": match["file_count"],
+ "source": match["username"],
+ "has_flac": match["has_flac"],
+ "attempts": len(tried) + 1,
+ "status": "queued",
+ }
+
+ # Wait for download to complete
+ print(f" ā³ Waiting up to {timeout_minutes} minutes for download...")
+
+ if self.slskd.wait_for_download(match["username"], match["file_count"],
+ timeout_minutes=timeout_minutes):
+ self._save_downloaded(pick["artist"], pick["album"], pick.get("reason"))
+ return {
+ "success": True,
+ "artist": pick["artist"],
+ "album": pick["album"],
+ "reason": pick["reason"],
+ "files": match["file_count"],
+ "source": match["username"],
+ "has_flac": match["has_flac"],
+ "attempts": len(tried) + 1,
+ "status": "completed",
+ }
+ else:
+ # Download timed out - cancel and try next
+ print(f" ā±ļø Timeout! Cancelling and trying next album...")
+ self.slskd.cancel_user_downloads(match["username"])
+ tried.append(f"{pick['artist']} - {pick['album']} (timeout after {timeout_minutes}min)")
+ continue
+
+ # Save failed attempts to wishlist for Discogs bookmarking
+ self._save_wishlist(tried)
+
+ return {
+ "success": False,
+ "error": f"Tried {len(tried)} albums, none worked",
+ "tried": tried,
+ }
+
+ def _save_wishlist(self, albums: list[str]):
+ """Save albums that couldn't be found to wishlist + bookmarks."""
+ wishlist_path = Path(__file__).parent / "music_wishlist.md"
+
+ # Read existing wishlist
+ existing = set()
+ if wishlist_path.exists():
+ with open(wishlist_path) as f:
+ for line in f:
+ if line.startswith("- [ ] "):
+ existing.add(line.strip()[6:].split(" — ")[0])
+
+ # Add new entries to markdown wishlist
+ with open(wishlist_path, "a") as f:
+ if not wishlist_path.exists() or wishlist_path.stat().st_size == 0:
+ f.write("# Music Wishlist\n\n")
+ f.write("Albums that couldn't be found on Soulseek.\n")
+ f.write("Search Discogs/Bandcamp manually or check back later.\n\n")
+
+ for album in albums:
+ clean = album.replace(" (download failed)", "").replace(" (not found)", "").replace(" (queue failed)", "")
+ clean = clean.split(" (timeout")[0] # Remove timeout messages
+ if clean not in existing:
+ query = clean.replace(" - ", " ").replace(" ", "+")
+ discogs_url = f"https://www.discogs.com/search/?q={query}&type=release"
+ f.write(f"- [ ] {clean} — [Discogs]({discogs_url})\n")
+
+ # Also add to Floccus bookmarks
+ self._add_to_bookmarks(clean, discogs_url)
+
+ def _add_to_bookmarks(self, title: str, url: str):
+ """Add album to Floccus bookmarks in Music/Wishlist folder."""
+ import subprocess
+
+ add_bookmark_script = Path(__file__).parent / "add_bookmark_to_wishlist.js"
+ if add_bookmark_script.exists():
+ try:
+ subprocess.run(
+ ["node", str(add_bookmark_script), url, title],
+ capture_output=True,
+ timeout=30
+ )
+ print(f" šŸ“Œ Added to bookmarks: {title}")
+ except Exception as e:
+ print(f" ⚠ Bookmark add failed: {e}")
+
+ def _load_state(self) -> dict:
+ if STATE_PATH.exists():
+ with open(STATE_PATH) as f:
+ return json.load(f)
+ return {"downloaded_albums": []}
+
+ def _save_downloaded(self, artist: str, album: str, reason: str = None):
+ state = self._load_state()
+ key = f"{artist.lower()} - {album.lower()}"
+ if key not in state["downloaded_albums"]:
+ state["downloaded_albums"].append(key)
+ # Keep last 100
+ state["downloaded_albums"] = state["downloaded_albums"][-100:]
+ state["last_download"] = {
+ "artist": artist,
+ "album": album,
+ "reason": reason,
+ "timestamp": datetime.now().isoformat(),
+ }
+ with open(STATE_PATH, "w") as f:
+ json.dump(state, f, indent=2)
+
+ def format_recommendations(self, recs: list[dict]) -> str:
+ lines = ["# Music Recommendations\n"]
+ lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
+
+ for i, rec in enumerate(recs, 1):
+ lines.append(f"{i}. **{rec['name']}**")
+ if "because_of" in rec:
+ lines.append(f" Similar to: {rec['because_of']}")
+ lines.append("")
+
+ return "\n".join(lines)
+
+ def format_gaps(self, gaps: list[dict]) -> str:
+ lines = ["# Library Gaps\n"]
+ lines.append("Artists you listen to but don't have in Navidrome:\n")
+
+ for i, gap in enumerate(gaps, 1):
+ lines.append(f"{i}. **{gap['name']}** ({gap['playcount']} plays)")
+
+ return "\n".join(lines)
+
+
+def load_config(path: Path) -> dict:
+ if not path.exists():
+ print(f"Config not found: {path}")
+ print("Create it with your API keys. See music_config.example.json")
+ sys.exit(1)
+
+ with open(path) as f:
+ return json.load(f)
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Music recommendation tool")
+ parser.add_argument("command", nargs="?", default="recommend",
+ choices=["recommend", "gaps", "recent", "search", "test",
+ "surprise", "slskd", "download"],
+ help="Command to run")
+ parser.add_argument("query", nargs="?", help="Search query")
+ parser.add_argument("--config", type=Path, default=CONFIG_PATH,
+ help="Path to config file")
+ parser.add_argument("--json", action="store_true", help="Output as JSON")
+ parser.add_argument("--wait", action="store_true",
+ help="Wait for download to complete (for overnight jobs)")
+ parser.add_argument("--timeout", type=int, default=30,
+ help="Timeout in minutes for download completion (default: 30)")
+
+ args = parser.parse_args()
+ config = load_config(args.config)
+ recommender = MusicRecommender(config)
+
+ if args.command == "test":
+ print("Testing Last.fm...", end=" ")
+ try:
+ recent = recommender.lastfm.get_recent_tracks(limit=1)
+ print(f"OK (last track: {recent[0]['track'][:30] if recent else 'none'}...)")
+ except Exception as e:
+ print(f"FAILED: {e}")
+
+ if recommender.navidrome:
+ print("Testing Navidrome...", end=" ")
+ if recommender.navidrome.ping():
+ artists = recommender.navidrome.get_artists()
+ print(f"OK ({len(artists)} artists)")
+ else:
+ print("FAILED")
+
+ if recommender.slskd:
+ print("Testing slskd...", end=" ")
+ try:
+ recommender.slskd._request("GET", "server")
+ print("OK")
+ except Exception as e:
+ print(f"FAILED: {e}")
+
+ elif args.command == "recommend":
+ recs = recommender.get_recommendations()
+ if args.json:
+ print(json.dumps(recs, indent=2))
+ else:
+ print(recommender.format_recommendations(recs))
+
+ elif args.command == "gaps":
+ gaps = recommender.find_gaps()
+ if args.json:
+ print(json.dumps(gaps, indent=2))
+ else:
+ print(recommender.format_gaps(gaps))
+
+ elif args.command == "recent":
+ recent = recommender.lastfm.get_recent_tracks(limit=20)
+ if args.json:
+ print(json.dumps(recent, indent=2))
+ else:
+ print("# Recent Tracks\n")
+ for t in recent:
+ status = "šŸŽµ NOW" if t["now_playing"] else " "
+ print(f"{status} {t['artist']} - {t['track']}")
+
+ elif args.command == "search":
+ if not args.query:
+ print("Search requires a query")
+ sys.exit(1)
+
+ if recommender.navidrome:
+ print(f"Searching Navidrome for '{args.query}'...")
+ results = recommender.navidrome.search(args.query)
+ if results["artists"]:
+ print("\nArtists in library:")
+ for a in results["artists"][:5]:
+ print(f" - {a['name']}")
+ if results["albums"]:
+ print("\nAlbums in library:")
+ for a in results["albums"][:5]:
+ print(f" - {a['artist']} - {a['name']}")
+
+ elif args.command == "surprise":
+ result = recommender.surprise_download(
+ wait_for_completion=args.wait,
+ timeout_minutes=args.timeout,
+ )
+ if args.json:
+ print(json.dumps(result, indent=2))
+ else:
+ if result["success"]:
+ status = result.get("status", "queued")
+ print(f"\nāœ… {'Download complete!' if status == 'completed' else 'Queued for download!'}")
+ print(f" {result['artist']} - {result['album']}")
+ print(f" {result['files']} files {'(FLAC)' if result['has_flac'] else '(MP3)'}")
+ print(f" From: {result['source']}")
+ if result.get("attempts", 1) > 1:
+ print(f" (took {result['attempts']} attempts)")
+ else:
+ print(f"\nāŒ {result['error']}")
+ if "tried" in result:
+ print(" Tried:")
+ for t in result["tried"]:
+ print(f" - {t}")
+
+ elif args.command == "slskd":
+ if not args.query:
+ print("slskd search requires a query")
+ sys.exit(1)
+
+ if not recommender.slskd:
+ print("slskd not configured")
+ sys.exit(1)
+
+ print(f"Searching Soulseek for '{args.query}'...")
+ search_id = recommender.slskd.search(args.query)
+ recommender.slskd.wait_for_search(search_id, timeout=30)
+ # Brief pause to ensure /responses endpoint is populated
+ time.sleep(2)
+ responses = recommender.slskd.get_search_responses(search_id)
+
+ print(f"\nFound {len(responses)} users with results:\n")
+ for resp in responses[:10]:
+ username = resp.get("username", "?")
+ files = resp.get("files", [])
+ has_slot = "āœ“" if resp.get("hasFreeUploadSlot") else "āœ—"
+ print(f" {has_slot} {username}: {len(files)} files")
+
+ elif args.command == "download":
+ if not args.query:
+ print("download requires 'artist - album'")
+ sys.exit(1)
+
+ if not recommender.slskd:
+ print("slskd not configured")
+ sys.exit(1)
+
+ # Parse "artist - album"
+ if " - " in args.query:
+ artist, album = args.query.split(" - ", 1)
+ else:
+ print("Format: 'Artist - Album'")
+ sys.exit(1)
+
+ print(f"Searching for: {artist} - {album}")
+ match = recommender.slskd.search_and_find_album(artist, album)
+
+ if match:
+ print(f"\nBest match: {match['file_count']} files from {match['username']}")
+ print(f"FLAC: {'Yes' if match['has_flac'] else 'No'}")
+ print(f"Directory: {match['directory'][:60]}...")
+
+ if recommender.slskd.download_album(match):
+ print("\nāœ… Download queued!")
+ else:
+ print("\nāŒ Download failed")
+ else:
+ print("\nāŒ No suitable results found")
+
+
+if __name__ == "__main__":
+ main()