spotify-genres/genres/genres_to_meta.py

567 lines
17 KiB
Python

"""
Store the fetched genre info (everynoise_genres.json) in the metadata files.
"""
import asyncio
import argparse
import os
import sys
import re
import json
import dataclasses
from pathlib import Path
from typing import Dict, List, Optional
import aiohttp
from aiostream import stream, pipe
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import spotify
import yaml
import model
import util
ROOT_GENRES = {
"blues", "children's music", "chill", "classical", "comedy", "country",
"dance", "electronica", "experimental", "folk", "hip hop", "instrumental",
"jazz", "metal", "musical", "non-music", "poetry", "pop", "r&b", "reggae",
"religious", "remix product", "rock", "rock-and-roll", "singer-songwriter",
"event", "soul", "soundtrack", "vocal", "world"
}
def genre_letter(id: str) -> str:
c1 = id[0]
if c1.isascii() and c1.isalpha():
return c1.lower()
return "0"
def group_genres(
genre_data: Dict[str, model.GenreMetadata]
) -> Dict[str, Dict[str, model.GenreMetadata]]:
"""Group genres by their first letter"""
grouped_genres = dict()
for genre_id, genre in genre_data.items():
lt = genre_letter(genre_id)
if lt not in grouped_genres:
grouped_genres[lt] = dict()
grouped_genres[lt][genre_id] = genre
return grouped_genres
def genres_to_meta():
with open(util.GENRE_FILE) as f:
genre_dict = json.load(f)
genre_data = model.load_genre_dict(genre_dict)
grouped_genres = group_genres(genre_data)
for lt, lt_genres in grouped_genres.items():
# Load stored metadata
md_path = util.METADATA_DIR / f"{lt}.yaml"
stored_genres = dict()
if md_path.is_file():
with md_path.open() as f:
data = yaml.safe_load(f)
stored_genres = model.load_genre_dict(data)
print(f"{lt}: {len(stored_genres)} stored")
# Iterate through all stored genres of that letter,
# update them using scraped data and marking them deprecated
# if they are not presnt in the scraped data
for sg_id, sg in stored_genres.items():
if sg_id in lt_genres:
genre = lt_genres[sg_id]
sg.name = genre.name
if not sg.playlists:
sg.playlists = genre.playlists
else:
sg.playlists.update(genre.playlists)
sg.rank = genre.rank
del lt_genres[sg_id]
elif not sg.metagenre:
sg.deprecated = True
# Genres not already stored in metadata files
stored_genres.update(lt_genres)
model.store_genres_yaml(md_path, stored_genres)
def write_meta(metadata: Dict[str, model.GenreMetadata]):
grouped_metadata = group_genres(metadata)
for lt, genres in grouped_metadata.items():
md_path = util.METADATA_DIR / f"{lt}.yaml"
model.store_genres_yaml(md_path, genres)
def migrate_meta():
metadata = util.read_meta()
mig_metadata = dict()
unknown_refs = set()
def mig_ref(ref: str, own: str) -> str:
if ref == own:
print("Error: self-reference: ", ref)
g = metadata.get(ref)
if g:
if g.alias:
g = metadata.get(g.alias)
return g.name
unknown_refs.add(ref)
return ref
for old_id, genre in metadata.items():
mig_genre = model.GenreMetadata(**dataclasses.asdict(genre))
mig_genre.name = "todo"
if genre.parent is not None:
mig_genre.parent = mig_ref(genre.parent, old_id)
if genre.alias is not None:
mig_genre.alias = mig_ref(genre.alias, old_id)
mig_metadata[genre.name] = mig_genre
print("unknown refs", unknown_refs)
write_meta(mig_metadata)
def remove_deprecated():
metadata = util.read_meta()
def flt(itm):
genre = itm[1]
if genre.deprecated:
print(itm[0], "deprecated")
return not genre.deprecated
metadata = dict(filter(flt, metadata.items()))
write_meta(metadata)
def validate():
metadata = util.read_meta()
n_err = 0
unknown_refs = set()
def validate_ref(ref: str, own: str) -> bool:
if ref == own:
print(f"ERR: '{own}' self-reference")
return False
g = metadata.get(ref)
if g:
if g.alias:
print(
f"ERR: '{own}' links to alias '{ref}', should be '{g.alias}'"
)
return False
return True
else:
unknown_refs.add(ref)
return False
for genre_id, genre in metadata.items():
if genre.parent:
if not validate_ref(genre.parent, genre_id):
n_err += 1
elif genre.alias:
if not validate_ref(genre.alias, genre_id):
n_err += 1
elif genre_id not in ROOT_GENRES and not genre.deprecated:
print(f"ERR: '{genre_id}' has no parent and is no root genre")
n_err += 1
if unknown_refs:
print("Unknown refs:", sorted(list(unknown_refs)))
if n_err > 0:
raise Exception(f"Validation returned {n_err} errors")
def make_translations():
metadata = util.read_meta()
translations = {
genre_id: genre.name
for genre_id, genre in metadata.items() if not genre.alias
}
with open(util.TRANSLATION_FILE, "w") as f:
json.dump(translations,
f,
ensure_ascii=False,
sort_keys=True,
indent=2)
f.write("\n")
for filename in os.listdir(util.TRANSLATION_DIR):
if filename == "_main.json":
continue
tl_path = util.TRANSLATION_DIR / filename
with open(tl_path) as f:
tl = json.load(f)
modified = False
for tk in list(tl.keys()):
if tk not in translations:
del tl[tk]
modified = True
if modified:
with open(tl_path, "w") as f:
json.dump(tl, f, ensure_ascii=False, sort_keys=True, indent=2)
f.write("\n")
def list_genres(limit=None):
metadata = util.read_meta()
with open(util.TRANSLATION_FILE_EN) as f: #
translation = json.load(f)
roots = []
for genre_id, genre in metadata.items():
if not genre.alias and not genre.parent:
roots.append(genre_id)
roots.sort()
tree = {}
def mkt(p, lvl=1):
subtree = {}
if limit and lvl >= limit:
return subtree
# find children
# children = [genre_id for genre_id, genre in metadata.items() if genre.parent == p]
for genre_id, genre in metadata.items():
if genre.parent == p:
subtree[translation[genre_id]] = mkt(genre_id, lvl + 1)
return subtree
for r in roots:
tree[translation[r]] = mkt(r)
# print(json.dumps(tree, indent=2, sort_keys=True, ensure_ascii=False))
print(yaml.safe_dump(tree, allow_unicode=True, indent=2, sort_keys=True))
def find_missing_lang():
"""
List genres which have a country tag set, but no language (Exceptions for
English-speaking countries and instrumental/indigenous music)
"""
metadata = util.read_meta()
# English/multilingual countries
skip_countries = {
"US", "UK", "AU", "NZ", "IN", "SG", "IE", "SA", "BE", "CA", "LU", "LR",
"JM", "NG", "KE", "BA", "CM", "RW", "CK", "VU", "GH", "ZM", "ZW", "GY",
"KM", "TD", "MW", "MV", "PG", "SC", "SL", "SB", "SS", "NA", "FJ", "BS",
"GM", "DM", "BW", "BI", "GD", "ML", "TT", "BB"
}
skip_parents = {"instrument", "indigenous"}
for genre_id, genre in metadata.items():
if genre.country and not genre.country in skip_countries and not genre.parent in skip_parents:
lang = genre.language
g = metadata.get(genre.parent)
while not lang and g:
lang = g.language
g = metadata.get(g.parent)
if not lang:
print(genre_id)
def check_localized_name():
metadata = util.read_meta()
for genre_id, genre in metadata.items():
lang = genre.language
g = metadata.get(genre.parent)
while not lang and g:
lang = g.language
g = metadata.get(g.parent)
if lang and not genre.localized_name:
print(genre_id)
def new_translation(new_file: Path, capitalize: bool):
"""
Create a new translation file with the untranslatable items
already filled in
"""
with open(util.TRANSLATION_FILE_EN) as f:
en_tl: Dict[str, str] = json.load(f)
with open(util.TRANSLATION_DIR / "de.json") as f:
de_tl: Dict[str, str] = json.load(f)
new_tl = dict()
upper_pattern = re.compile(r"(?<=[ \-])[a-z](?=[a-z])")
for key, english in en_tl.items():
german = de_tl[key]
english_upper = english
for m in upper_pattern.finditer(english_upper):
s = m.start(0)
english_upper = english_upper[:s] + english_upper[s].upper(
) + english_upper[s + 1:]
if english_upper == german:
if capitalize:
new_tl[key] = english_upper
else:
new_tl[key] = english
with open(new_file, "w") as f:
json.dump(new_tl, f, ensure_ascii=False, sort_keys=True, indent=2)
f.write("\n")
async def scrape_playlists():
"""Scrape playlist IDs from everynoise.com"""
spid_pattern = re.compile("/([A-z0-9]{22})$")
for i in range(27):
metadata = util.read_meta_n(i)
async def scrape_playlist(g_id: str):
genre = metadata[g_id]
if genre.metagenre or genre.deprecated or genre.playlists:
return
urlid = re.sub(r"[^A-Za-z0-9]+", '', g_id)
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://everynoise.com/engenremap-{urlid}.html"
) as response:
html_text = await response.text()
html = BeautifulSoup(html_text, features="lxml")
pl_links = {
"sound": f"listen to The Sound of {genre.name} on Spotify",
"intro": "listen to a shorter introduction to this genre",
"pulse": "listen to this genre's fans' current favorites",
"edge": "listen to this genre's fans' new discoveries",
"2023": "listen to this genre's fans' favorites of 2023",
}
for key, title in pl_links.items():
pl_link = html.find("a", {"title": title})
if pl_link is None:
if key == "sound":
print(html_text)
raise Exception(
f"could not find {key} link for {genre.name}")
else:
continue
pl_id = spid_pattern.search(pl_link["href"]).group(1)
if not genre.playlists:
metadata[g_id].playlists = dict()
metadata[g_id].playlists[key] = pl_id
# Remove legacy playlist id
if genre.playlist_id:
assert genre.playlist_id == genre.playlists["sound"]
metadata[g_id].playlist_id = None
print("scraped playlists for", g_id)
sx = stream.iterate(metadata.keys()) | pipe.map(scrape_playlist,
task_limit=5)
await sx
write_meta(metadata)
async def scrape_playlists_year(year: int):
"""Scrape yearly playlists of previous years from Spotify users"""
username = f"particledetector{year}"
prefix = f"{year} in "
offset = 0
metadata = util.read_meta()
name_map = {v.name: k for k, v in metadata.items()}
while True:
async with spotify.Client(
os.getenv("SPOTIFY_CLIENT_ID"),
os.getenv("SPOTIFY_CLIENT_SECRET")) as client:
playlists = await client.http.get_playlists(username,
limit=50,
offset=offset)
items = playlists["items"]
for item in items:
name: str = item["name"]
if not name.startswith(prefix):
continue
genre_name = name.removeprefix(prefix)
if genre_name in name_map:
g_id = name_map[genre_name]
metadata[g_id].playlists[str(year)] = item["id"]
print("found", genre_name)
if len(items) < 50:
break
offset += len(items)
write_meta(metadata)
def genre_tree(metadata: Dict[str, model.GenreMetadata]) -> dict:
def mkt(p: Optional[str]):
subtree = []
# find children
for genre_id, genre in metadata.items():
if not genre.alias and not genre.deprecated and genre.parent == p:
sg = model.GenreMetadataTree.conv(genre_id, genre)
sg.children = mkt(genre_id)
subtree.append(sg)
if len(subtree) == 0:
return None
subtree.sort()
return subtree
return mkt(None)
def package(out_dir: Path):
"""Package genre data for consumption"""
metadata = util.read_meta()
lang_dir = out_dir / "lang"
os.makedirs(lang_dir, exist_ok=True)
tree_dir = out_dir / "tree"
os.makedirs(tree_dir, exist_ok=True)
with open(util.TRANSLATION_FILE) as f:
tl_main = json.load(f)
min_keys = len(tl_main) * 0.7
with open(util.TRANSLATION_FILE_EN) as f:
tl_en = json.load(f)
# Remove redundant tags
def remove_redundant_tags(gid: str, parent: str):
genre = metadata[gid]
pg = metadata[parent]
if pg.language and pg.language == genre.language:
metadata[gid].language = None
if pg.country and pg.country == genre.country:
metadata[gid].country = None
if pg.region and pg.region == genre.region:
metadata[gid].region = None
if pg.parent:
remove_redundant_tags(gid, pg.parent)
for genre_id, genre in metadata.items():
if genre.parent:
remove_redundant_tags(genre_id, genre.parent)
# Genre database
db = {
g_id: model.GenreMetadataDB.conv(genre, tl_en.get(g_id))
for g_id, genre in metadata.items()
}
model.store_pack_json(out_dir / "genres.json", db, True)
# Filter other translations (if != English name)
for filename in os.listdir(util.TRANSLATION_DIR):
if filename == "_main.json" or filename == "en.json":
continue
tl_path = util.TRANSLATION_DIR / filename
with open(tl_path) as f:
tl = json.load(f)
if len(tl) < min_keys:
print("Skipping", filename)
continue
filtered_tl = {k: v for k, v in tl.items() if v != tl_en[k]}
with open(lang_dir / filename, "w") as f:
json.dump(filtered_tl, f)
# Genre trees (one for each language)
tree = genre_tree(metadata)
def tl_tree(tree: List[model.GenreMetadataTree], tl: Dict[str, str]):
for n in tree:
n.name = tl.get(n.id, tl_en[n.id])
if n.children:
tl_tree(n.children, tl)
for filename in os.listdir(util.TRANSLATION_DIR):
if filename == "_main.json":
continue
tl_path = util.TRANSLATION_DIR / filename
with open(tl_path) as f:
tl = json.load(f)
if len(tl) < min_keys:
continue
tl_tree(tree, tl)
model.store_pack_json(tree_dir / f"tree.{filename}", tree)
if __name__ == "__main__":
load_dotenv()
parser = argparse.ArgumentParser()
parser.add_argument("op", type=str, help="Operation")
parser.add_argument("--limit",
type=int,
help="Limit",
default=None,
required=False)
parser.add_argument("--year",
type=int,
help="Year",
default=None,
required=False)
parser.add_argument("--file",
type=Path,
help="Limit",
default=None,
required=False)
parser.add_argument("--capitalize",
action='store_true',
help="Create capitalized translation")
args = parser.parse_args()
if args.op == "genres2meta":
genres_to_meta()
elif args.op == "validate":
validate()
elif args.op == "maketl":
make_translations()
elif args.op == "ls":
list_genres(args.limit)
elif args.op == "missingLang":
find_missing_lang()
elif args.op == "newtl":
new_translation(args.file, args.capitalize)
elif args.op == "checkln":
check_localized_name()
elif args.op == "scrapePlaylists":
asyncio.get_event_loop().run_until_complete(scrape_playlists())
elif args.op == "scrapePlaylistsYear":
asyncio.get_event_loop().run_until_complete(
scrape_playlists_year(args.year))
elif args.op == "package":
package(args.file)
else:
sys.exit(2)