""" Store the fetched genre info (everynoise_genres.json) in the metadata files. """ import asyncio import argparse import os import sys import re import json import dataclasses from pathlib import Path from typing import Dict, List, Optional import aiohttp from aiostream import stream, pipe from bs4 import BeautifulSoup from dotenv import load_dotenv import spotify import yaml import model import util ROOT_GENRES = { "blues", "children's music", "chill", "classical", "comedy", "country", "dance", "electronica", "experimental", "folk", "hip hop", "instrumental", "jazz", "metal", "musical", "non-music", "poetry", "pop", "r&b", "reggae", "religious", "remix product", "rock", "rock-and-roll", "singer-songwriter", "event", "soul", "soundtrack", "vocal", "world" } def genre_letter(id: str) -> str: c1 = id[0] if c1.isascii() and c1.isalpha(): return c1.lower() return "0" def group_genres( genre_data: Dict[str, model.GenreMetadata] ) -> Dict[str, Dict[str, model.GenreMetadata]]: """Group genres by their first letter""" grouped_genres = dict() for genre_id, genre in genre_data.items(): lt = genre_letter(genre_id) if lt not in grouped_genres: grouped_genres[lt] = dict() grouped_genres[lt][genre_id] = genre return grouped_genres def genres_to_meta(): with open(util.GENRE_FILE) as f: genre_dict = json.load(f) genre_data = model.load_genre_dict(genre_dict) grouped_genres = group_genres(genre_data) for lt, lt_genres in grouped_genres.items(): # Load stored metadata md_path = util.METADATA_DIR / f"{lt}.yaml" stored_genres = dict() if md_path.is_file(): with md_path.open() as f: data = yaml.safe_load(f) stored_genres = model.load_genre_dict(data) print(f"{lt}: {len(stored_genres)} stored") # Iterate through all stored genres of that letter, # update them using scraped data and marking them deprecated # if they are not presnt in the scraped data for sg_id, sg in stored_genres.items(): if sg_id in lt_genres: genre = lt_genres[sg_id] sg.name = genre.name if not sg.playlists: sg.playlists = genre.playlists else: sg.playlists.update(genre.playlists) sg.rank = genre.rank del lt_genres[sg_id] elif not sg.metagenre: sg.deprecated = True # Genres not already stored in metadata files stored_genres.update(lt_genres) model.store_genres_yaml(md_path, stored_genres) def write_meta(metadata: Dict[str, model.GenreMetadata]): grouped_metadata = group_genres(metadata) for lt, genres in grouped_metadata.items(): md_path = util.METADATA_DIR / f"{lt}.yaml" model.store_genres_yaml(md_path, genres) def migrate_meta(): metadata = util.read_meta() mig_metadata = dict() unknown_refs = set() def mig_ref(ref: str, own: str) -> str: if ref == own: print("Error: self-reference: ", ref) g = metadata.get(ref) if g: if g.alias: g = metadata.get(g.alias) return g.name unknown_refs.add(ref) return ref for old_id, genre in metadata.items(): mig_genre = model.GenreMetadata(**dataclasses.asdict(genre)) mig_genre.name = "todo" if genre.parent is not None: mig_genre.parent = mig_ref(genre.parent, old_id) if genre.alias is not None: mig_genre.alias = mig_ref(genre.alias, old_id) mig_metadata[genre.name] = mig_genre print("unknown refs", unknown_refs) write_meta(mig_metadata) def remove_deprecated(): metadata = util.read_meta() def flt(itm): genre = itm[1] if genre.deprecated: print(itm[0], "deprecated") return not genre.deprecated metadata = dict(filter(flt, metadata.items())) write_meta(metadata) def validate(): metadata = util.read_meta() n_err = 0 unknown_refs = set() def validate_ref(ref: str, own: str) -> bool: if ref == own: print(f"ERR: '{own}' self-reference") return False g = metadata.get(ref) if g: if g.alias: print( f"ERR: '{own}' links to alias '{ref}', should be '{g.alias}'" ) return False return True else: unknown_refs.add(ref) return False for genre_id, genre in metadata.items(): if genre.parent: if not validate_ref(genre.parent, genre_id): n_err += 1 elif genre.alias: if not validate_ref(genre.alias, genre_id): n_err += 1 elif genre_id not in ROOT_GENRES and not genre.deprecated: print(f"ERR: '{genre_id}' has no parent and is no root genre") n_err += 1 if unknown_refs: print("Unknown refs:", sorted(list(unknown_refs))) if n_err > 0: raise Exception(f"Validation returned {n_err} errors") def make_translations(): metadata = util.read_meta() translations = { genre_id: genre.name for genre_id, genre in metadata.items() if not genre.alias } with open(util.TRANSLATION_FILE, "w") as f: json.dump(translations, f, ensure_ascii=False, sort_keys=True, indent=2) f.write("\n") for filename in os.listdir(util.TRANSLATION_DIR): if filename == "_main.json": continue tl_path = util.TRANSLATION_DIR / filename with open(tl_path) as f: tl = json.load(f) modified = False for tk in list(tl.keys()): if tk not in translations: del tl[tk] modified = True if modified: with open(tl_path, "w") as f: json.dump(tl, f, ensure_ascii=False, sort_keys=True, indent=2) f.write("\n") def list_genres(limit=None): metadata = util.read_meta() with open(util.TRANSLATION_FILE_EN) as f: # translation = json.load(f) roots = [] for genre_id, genre in metadata.items(): if not genre.alias and not genre.parent: roots.append(genre_id) roots.sort() tree = {} def mkt(p, lvl=1): subtree = {} if limit and lvl >= limit: return subtree # find children # children = [genre_id for genre_id, genre in metadata.items() if genre.parent == p] for genre_id, genre in metadata.items(): if genre.parent == p: subtree[translation[genre_id]] = mkt(genre_id, lvl + 1) return subtree for r in roots: tree[translation[r]] = mkt(r) # print(json.dumps(tree, indent=2, sort_keys=True, ensure_ascii=False)) print(yaml.safe_dump(tree, allow_unicode=True, indent=2, sort_keys=True)) def find_missing_lang(): """ List genres which have a country tag set, but no language (Exceptions for English-speaking countries and instrumental/indigenous music) """ metadata = util.read_meta() # English/multilingual countries skip_countries = { "US", "UK", "AU", "NZ", "IN", "SG", "IE", "SA", "BE", "CA", "LU", "LR", "JM", "NG", "KE", "BA", "CM", "RW", "CK", "VU", "GH", "ZM", "ZW", "GY", "KM", "TD", "MW", "MV", "PG", "SC", "SL", "SB", "SS", "NA", "FJ", "BS", "GM", "DM", "BW", "BI", "GD", "ML", "TT", "BB" } skip_parents = {"instrument", "indigenous"} for genre_id, genre in metadata.items(): if genre.country and not genre.country in skip_countries and not genre.parent in skip_parents: lang = genre.language g = metadata.get(genre.parent) while not lang and g: lang = g.language g = metadata.get(g.parent) if not lang: print(genre_id) def check_localized_name(): metadata = util.read_meta() for genre_id, genre in metadata.items(): lang = genre.language g = metadata.get(genre.parent) while not lang and g: lang = g.language g = metadata.get(g.parent) if lang and not genre.localized_name: print(genre_id) def new_translation(new_file: Path, capitalize: bool): """ Create a new translation file with the untranslatable items already filled in """ with open(util.TRANSLATION_FILE_EN) as f: en_tl: Dict[str, str] = json.load(f) with open(util.TRANSLATION_DIR / "de.json") as f: de_tl: Dict[str, str] = json.load(f) new_tl = dict() upper_pattern = re.compile(r"(?<=[ \-])[a-z](?=[a-z])") for key, english in en_tl.items(): german = de_tl[key] english_upper = english for m in upper_pattern.finditer(english_upper): s = m.start(0) english_upper = english_upper[:s] + english_upper[s].upper( ) + english_upper[s + 1:] if english_upper == german: if capitalize: new_tl[key] = english_upper else: new_tl[key] = english with open(new_file, "w") as f: json.dump(new_tl, f, ensure_ascii=False, sort_keys=True, indent=2) f.write("\n") async def scrape_playlists(): """Scrape playlist IDs from everynoise.com""" spid_pattern = re.compile("/([A-z0-9]{22})$") for i in range(27): metadata = util.read_meta_n(i) async def scrape_playlist(g_id: str): genre = metadata[g_id] if genre.metagenre or genre.deprecated or genre.playlists: return urlid = re.sub(r"[^A-Za-z0-9]+", '', g_id) async with aiohttp.ClientSession() as session: async with session.get( f"https://everynoise.com/engenremap-{urlid}.html" ) as response: html_text = await response.text() html = BeautifulSoup(html_text, features="lxml") pl_links = { "sound": f"listen to The Sound of {genre.name} on Spotify", "intro": "listen to a shorter introduction to this genre", "pulse": "listen to this genre's fans' current favorites", "edge": "listen to this genre's fans' new discoveries", "2023": "listen to this genre's fans' favorites of 2023", } for key, title in pl_links.items(): pl_link = html.find("a", {"title": title}) if pl_link is None: if key == "sound": print(html_text) raise Exception( f"could not find {key} link for {genre.name}") else: continue pl_id = spid_pattern.search(pl_link["href"]).group(1) if not genre.playlists: metadata[g_id].playlists = dict() metadata[g_id].playlists[key] = pl_id # Remove legacy playlist id if genre.playlist_id: assert genre.playlist_id == genre.playlists["sound"] metadata[g_id].playlist_id = None print("scraped playlists for", g_id) sx = stream.iterate(metadata.keys()) | pipe.map(scrape_playlist, task_limit=5) await sx write_meta(metadata) async def scrape_playlists_year(year: int): """Scrape yearly playlists of previous years from Spotify users""" username = f"particledetector{year}" prefix = f"{year} in " offset = 0 metadata = util.read_meta() name_map = {v.name: k for k, v in metadata.items()} while True: async with spotify.Client( os.getenv("SPOTIFY_CLIENT_ID"), os.getenv("SPOTIFY_CLIENT_SECRET")) as client: playlists = await client.http.get_playlists(username, limit=50, offset=offset) items = playlists["items"] for item in items: name: str = item["name"] if not name.startswith(prefix): continue genre_name = name.removeprefix(prefix) if genre_name in name_map: g_id = name_map[genre_name] metadata[g_id].playlists[str(year)] = item["id"] print("found", genre_name) if len(items) < 50: break offset += len(items) write_meta(metadata) def genre_tree(metadata: Dict[str, model.GenreMetadata]) -> dict: def mkt(p: Optional[str]): subtree = [] # find children for genre_id, genre in metadata.items(): if not genre.alias and not genre.deprecated and genre.parent == p: sg = model.GenreMetadataTree.conv(genre_id, genre) sg.children = mkt(genre_id) subtree.append(sg) if len(subtree) == 0: return None subtree.sort() return subtree return mkt(None) def package(out_dir: Path): """Package genre data for consumption""" metadata = util.read_meta() lang_dir = out_dir / "lang" os.makedirs(lang_dir, exist_ok=True) tree_dir = out_dir / "tree" os.makedirs(tree_dir, exist_ok=True) with open(util.TRANSLATION_FILE) as f: tl_main = json.load(f) min_keys = len(tl_main) * 0.7 with open(util.TRANSLATION_FILE_EN) as f: tl_en = json.load(f) # Remove redundant tags def remove_redundant_tags(gid: str, parent: str): genre = metadata[gid] pg = metadata[parent] if pg.language and pg.language == genre.language: metadata[gid].language = None if pg.country and pg.country == genre.country: metadata[gid].country = None if pg.region and pg.region == genre.region: metadata[gid].region = None if pg.parent: remove_redundant_tags(gid, pg.parent) for genre_id, genre in metadata.items(): if genre.parent: remove_redundant_tags(genre_id, genre.parent) # Genre database db = { g_id: model.GenreMetadataDB.conv(genre, tl_en.get(g_id)) for g_id, genre in metadata.items() } model.store_pack_json(out_dir / "genres.json", db, True) # Filter other translations (if != English name) for filename in os.listdir(util.TRANSLATION_DIR): if filename == "_main.json" or filename == "en.json": continue tl_path = util.TRANSLATION_DIR / filename with open(tl_path) as f: tl = json.load(f) if len(tl) < min_keys: print("Skipping", filename) continue filtered_tl = {k: v for k, v in tl.items() if v != tl_en[k]} with open(lang_dir / filename, "w") as f: json.dump(filtered_tl, f) # Genre trees (one for each language) tree = genre_tree(metadata) def tl_tree(tree: List[model.GenreMetadataTree], tl: Dict[str, str]): for n in tree: n.name = tl.get(n.id, tl_en[n.id]) if n.children: tl_tree(n.children, tl) for filename in os.listdir(util.TRANSLATION_DIR): if filename == "_main.json": continue tl_path = util.TRANSLATION_DIR / filename with open(tl_path) as f: tl = json.load(f) if len(tl) < min_keys: continue tl_tree(tree, tl) model.store_pack_json(tree_dir / f"tree.{filename}", tree) if __name__ == "__main__": load_dotenv() parser = argparse.ArgumentParser() parser.add_argument("op", type=str, help="Operation") parser.add_argument("--limit", type=int, help="Limit", default=None, required=False) parser.add_argument("--year", type=int, help="Year", default=None, required=False) parser.add_argument("--file", type=Path, help="Limit", default=None, required=False) parser.add_argument("--capitalize", action='store_true', help="Create capitalized translation") args = parser.parse_args() if args.op == "genres2meta": genres_to_meta() elif args.op == "validate": validate() elif args.op == "maketl": make_translations() elif args.op == "ls": list_genres(args.limit) elif args.op == "missingLang": find_missing_lang() elif args.op == "newtl": new_translation(args.file, args.capitalize) elif args.op == "checkln": check_localized_name() elif args.op == "scrapePlaylists": asyncio.get_event_loop().run_until_complete(scrape_playlists()) elif args.op == "scrapePlaylistsYear": asyncio.get_event_loop().run_until_complete( scrape_playlists_year(args.year)) elif args.op == "package": package(args.file) else: sys.exit(2)