567 lines
17 KiB
Python
567 lines
17 KiB
Python
"""
|
|
Store the fetched genre info (everynoise_genres.json) in the metadata files.
|
|
"""
|
|
import asyncio
|
|
import argparse
|
|
import os
|
|
import sys
|
|
import re
|
|
import json
|
|
import dataclasses
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
import aiohttp
|
|
from aiostream import stream, pipe
|
|
from bs4 import BeautifulSoup
|
|
from dotenv import load_dotenv
|
|
import spotify
|
|
import yaml
|
|
|
|
import model
|
|
import util
|
|
|
|
ROOT_GENRES = {
|
|
"blues", "children's music", "chill", "classical", "comedy", "country",
|
|
"dance", "electronica", "experimental", "folk", "hip hop", "instrumental",
|
|
"jazz", "metal", "musical", "non-music", "poetry", "pop", "r&b", "reggae",
|
|
"religious", "remix product", "rock", "rock-and-roll", "singer-songwriter",
|
|
"event", "soul", "soundtrack", "vocal", "world"
|
|
}
|
|
|
|
|
|
def genre_letter(id: str) -> str:
|
|
c1 = id[0]
|
|
if c1.isascii() and c1.isalpha():
|
|
return c1.lower()
|
|
return "0"
|
|
|
|
|
|
def group_genres(
|
|
genre_data: Dict[str, model.GenreMetadata]
|
|
) -> Dict[str, Dict[str, model.GenreMetadata]]:
|
|
"""Group genres by their first letter"""
|
|
grouped_genres = dict()
|
|
for genre_id, genre in genre_data.items():
|
|
lt = genre_letter(genre_id)
|
|
if lt not in grouped_genres:
|
|
grouped_genres[lt] = dict()
|
|
grouped_genres[lt][genre_id] = genre
|
|
return grouped_genres
|
|
|
|
|
|
def genres_to_meta():
|
|
with open(util.GENRE_FILE) as f:
|
|
genre_dict = json.load(f)
|
|
genre_data = model.load_genre_dict(genre_dict)
|
|
|
|
grouped_genres = group_genres(genre_data)
|
|
|
|
for lt, lt_genres in grouped_genres.items():
|
|
# Load stored metadata
|
|
md_path = util.METADATA_DIR / f"{lt}.yaml"
|
|
stored_genres = dict()
|
|
if md_path.is_file():
|
|
with md_path.open() as f:
|
|
data = yaml.safe_load(f)
|
|
stored_genres = model.load_genre_dict(data)
|
|
print(f"{lt}: {len(stored_genres)} stored")
|
|
|
|
# Iterate through all stored genres of that letter,
|
|
# update them using scraped data and marking them deprecated
|
|
# if they are not presnt in the scraped data
|
|
for sg_id, sg in stored_genres.items():
|
|
if sg_id in lt_genres:
|
|
genre = lt_genres[sg_id]
|
|
sg.name = genre.name
|
|
if not sg.playlists:
|
|
sg.playlists = genre.playlists
|
|
else:
|
|
sg.playlists.update(genre.playlists)
|
|
sg.rank = genre.rank
|
|
del lt_genres[sg_id]
|
|
elif not sg.metagenre:
|
|
sg.deprecated = True
|
|
|
|
# Genres not already stored in metadata files
|
|
stored_genres.update(lt_genres)
|
|
|
|
model.store_genres_yaml(md_path, stored_genres)
|
|
|
|
|
|
def write_meta(metadata: Dict[str, model.GenreMetadata]):
|
|
grouped_metadata = group_genres(metadata)
|
|
for lt, genres in grouped_metadata.items():
|
|
md_path = util.METADATA_DIR / f"{lt}.yaml"
|
|
model.store_genres_yaml(md_path, genres)
|
|
|
|
|
|
def migrate_meta():
|
|
metadata = util.read_meta()
|
|
mig_metadata = dict()
|
|
unknown_refs = set()
|
|
|
|
def mig_ref(ref: str, own: str) -> str:
|
|
if ref == own:
|
|
print("Error: self-reference: ", ref)
|
|
|
|
g = metadata.get(ref)
|
|
if g:
|
|
if g.alias:
|
|
g = metadata.get(g.alias)
|
|
return g.name
|
|
unknown_refs.add(ref)
|
|
return ref
|
|
|
|
for old_id, genre in metadata.items():
|
|
mig_genre = model.GenreMetadata(**dataclasses.asdict(genre))
|
|
mig_genre.name = "todo"
|
|
|
|
if genre.parent is not None:
|
|
mig_genre.parent = mig_ref(genre.parent, old_id)
|
|
if genre.alias is not None:
|
|
mig_genre.alias = mig_ref(genre.alias, old_id)
|
|
|
|
mig_metadata[genre.name] = mig_genre
|
|
|
|
print("unknown refs", unknown_refs)
|
|
write_meta(mig_metadata)
|
|
|
|
|
|
def remove_deprecated():
|
|
metadata = util.read_meta()
|
|
|
|
def flt(itm):
|
|
genre = itm[1]
|
|
if genre.deprecated:
|
|
print(itm[0], "deprecated")
|
|
return not genre.deprecated
|
|
|
|
metadata = dict(filter(flt, metadata.items()))
|
|
|
|
write_meta(metadata)
|
|
|
|
|
|
def validate():
|
|
metadata = util.read_meta()
|
|
|
|
n_err = 0
|
|
unknown_refs = set()
|
|
|
|
def validate_ref(ref: str, own: str) -> bool:
|
|
if ref == own:
|
|
print(f"ERR: '{own}' self-reference")
|
|
return False
|
|
|
|
g = metadata.get(ref)
|
|
if g:
|
|
if g.alias:
|
|
print(
|
|
f"ERR: '{own}' links to alias '{ref}', should be '{g.alias}'"
|
|
)
|
|
return False
|
|
return True
|
|
else:
|
|
unknown_refs.add(ref)
|
|
return False
|
|
|
|
for genre_id, genre in metadata.items():
|
|
if genre.parent:
|
|
if not validate_ref(genre.parent, genre_id):
|
|
n_err += 1
|
|
elif genre.alias:
|
|
if not validate_ref(genre.alias, genre_id):
|
|
n_err += 1
|
|
elif genre_id not in ROOT_GENRES and not genre.deprecated:
|
|
print(f"ERR: '{genre_id}' has no parent and is no root genre")
|
|
n_err += 1
|
|
|
|
if unknown_refs:
|
|
print("Unknown refs:", sorted(list(unknown_refs)))
|
|
|
|
if n_err > 0:
|
|
raise Exception(f"Validation returned {n_err} errors")
|
|
|
|
|
|
def make_translations():
|
|
metadata = util.read_meta()
|
|
translations = {
|
|
genre_id: genre.name
|
|
for genre_id, genre in metadata.items() if not genre.alias
|
|
}
|
|
with open(util.TRANSLATION_FILE, "w") as f:
|
|
json.dump(translations,
|
|
f,
|
|
ensure_ascii=False,
|
|
sort_keys=True,
|
|
indent=2)
|
|
f.write("\n")
|
|
|
|
for filename in os.listdir(util.TRANSLATION_DIR):
|
|
if filename == "_main.json":
|
|
continue
|
|
tl_path = util.TRANSLATION_DIR / filename
|
|
with open(tl_path) as f:
|
|
tl = json.load(f)
|
|
modified = False
|
|
for tk in list(tl.keys()):
|
|
if tk not in translations:
|
|
del tl[tk]
|
|
modified = True
|
|
if modified:
|
|
with open(tl_path, "w") as f:
|
|
json.dump(tl, f, ensure_ascii=False, sort_keys=True, indent=2)
|
|
f.write("\n")
|
|
|
|
|
|
def list_genres(limit=None):
|
|
metadata = util.read_meta()
|
|
|
|
with open(util.TRANSLATION_FILE_EN) as f: #
|
|
translation = json.load(f)
|
|
|
|
roots = []
|
|
for genre_id, genre in metadata.items():
|
|
if not genre.alias and not genre.parent:
|
|
roots.append(genre_id)
|
|
|
|
roots.sort()
|
|
|
|
tree = {}
|
|
|
|
def mkt(p, lvl=1):
|
|
subtree = {}
|
|
if limit and lvl >= limit:
|
|
return subtree
|
|
|
|
# find children
|
|
# children = [genre_id for genre_id, genre in metadata.items() if genre.parent == p]
|
|
for genre_id, genre in metadata.items():
|
|
if genre.parent == p:
|
|
subtree[translation[genre_id]] = mkt(genre_id, lvl + 1)
|
|
return subtree
|
|
|
|
for r in roots:
|
|
tree[translation[r]] = mkt(r)
|
|
|
|
# print(json.dumps(tree, indent=2, sort_keys=True, ensure_ascii=False))
|
|
print(yaml.safe_dump(tree, allow_unicode=True, indent=2, sort_keys=True))
|
|
|
|
|
|
def find_missing_lang():
|
|
"""
|
|
List genres which have a country tag set, but no language (Exceptions for
|
|
English-speaking countries and instrumental/indigenous music)
|
|
"""
|
|
metadata = util.read_meta()
|
|
|
|
# English/multilingual countries
|
|
skip_countries = {
|
|
"US", "UK", "AU", "NZ", "IN", "SG", "IE", "SA", "BE", "CA", "LU", "LR",
|
|
"JM", "NG", "KE", "BA", "CM", "RW", "CK", "VU", "GH", "ZM", "ZW", "GY",
|
|
"KM", "TD", "MW", "MV", "PG", "SC", "SL", "SB", "SS", "NA", "FJ", "BS",
|
|
"GM", "DM", "BW", "BI", "GD", "ML", "TT", "BB"
|
|
}
|
|
|
|
skip_parents = {"instrument", "indigenous"}
|
|
|
|
for genre_id, genre in metadata.items():
|
|
if genre.country and not genre.country in skip_countries and not genre.parent in skip_parents:
|
|
lang = genre.language
|
|
g = metadata.get(genre.parent)
|
|
while not lang and g:
|
|
lang = g.language
|
|
g = metadata.get(g.parent)
|
|
|
|
if not lang:
|
|
print(genre_id)
|
|
|
|
|
|
def check_localized_name():
|
|
metadata = util.read_meta()
|
|
|
|
for genre_id, genre in metadata.items():
|
|
lang = genre.language
|
|
g = metadata.get(genre.parent)
|
|
while not lang and g:
|
|
lang = g.language
|
|
g = metadata.get(g.parent)
|
|
|
|
if lang and not genre.localized_name:
|
|
print(genre_id)
|
|
|
|
|
|
def new_translation(new_file: Path, capitalize: bool):
|
|
"""
|
|
Create a new translation file with the untranslatable items
|
|
already filled in
|
|
"""
|
|
with open(util.TRANSLATION_FILE_EN) as f:
|
|
en_tl: Dict[str, str] = json.load(f)
|
|
with open(util.TRANSLATION_DIR / "de.json") as f:
|
|
de_tl: Dict[str, str] = json.load(f)
|
|
|
|
new_tl = dict()
|
|
|
|
upper_pattern = re.compile(r"(?<=[ \-])[a-z](?=[a-z])")
|
|
|
|
for key, english in en_tl.items():
|
|
german = de_tl[key]
|
|
english_upper = english
|
|
for m in upper_pattern.finditer(english_upper):
|
|
s = m.start(0)
|
|
english_upper = english_upper[:s] + english_upper[s].upper(
|
|
) + english_upper[s + 1:]
|
|
|
|
if english_upper == german:
|
|
if capitalize:
|
|
new_tl[key] = english_upper
|
|
else:
|
|
new_tl[key] = english
|
|
|
|
with open(new_file, "w") as f:
|
|
json.dump(new_tl, f, ensure_ascii=False, sort_keys=True, indent=2)
|
|
f.write("\n")
|
|
|
|
|
|
async def scrape_playlists():
|
|
"""Scrape playlist IDs from everynoise.com"""
|
|
spid_pattern = re.compile("/([A-z0-9]{22})$")
|
|
|
|
for i in range(27):
|
|
metadata = util.read_meta_n(i)
|
|
|
|
async def scrape_playlist(g_id: str):
|
|
genre = metadata[g_id]
|
|
if genre.metagenre or genre.deprecated or genre.playlists:
|
|
return
|
|
|
|
urlid = re.sub(r"[^A-Za-z0-9]+", '', g_id)
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.get(
|
|
f"https://everynoise.com/engenremap-{urlid}.html"
|
|
) as response:
|
|
html_text = await response.text()
|
|
html = BeautifulSoup(html_text, features="lxml")
|
|
|
|
pl_links = {
|
|
"sound": f"listen to The Sound of {genre.name} on Spotify",
|
|
"intro": "listen to a shorter introduction to this genre",
|
|
"pulse": "listen to this genre's fans' current favorites",
|
|
"edge": "listen to this genre's fans' new discoveries",
|
|
"2023": "listen to this genre's fans' favorites of 2023",
|
|
}
|
|
|
|
for key, title in pl_links.items():
|
|
pl_link = html.find("a", {"title": title})
|
|
if pl_link is None:
|
|
if key == "sound":
|
|
print(html_text)
|
|
raise Exception(
|
|
f"could not find {key} link for {genre.name}")
|
|
else:
|
|
continue
|
|
|
|
pl_id = spid_pattern.search(pl_link["href"]).group(1)
|
|
if not genre.playlists:
|
|
metadata[g_id].playlists = dict()
|
|
metadata[g_id].playlists[key] = pl_id
|
|
|
|
# Remove legacy playlist id
|
|
if genre.playlist_id:
|
|
assert genre.playlist_id == genre.playlists["sound"]
|
|
metadata[g_id].playlist_id = None
|
|
|
|
print("scraped playlists for", g_id)
|
|
|
|
sx = stream.iterate(metadata.keys()) | pipe.map(scrape_playlist,
|
|
task_limit=5)
|
|
await sx
|
|
|
|
write_meta(metadata)
|
|
|
|
|
|
async def scrape_playlists_year(year: int):
|
|
"""Scrape yearly playlists of previous years from Spotify users"""
|
|
username = f"particledetector{year}"
|
|
prefix = f"{year} in "
|
|
offset = 0
|
|
|
|
metadata = util.read_meta()
|
|
|
|
name_map = {v.name: k for k, v in metadata.items()}
|
|
|
|
while True:
|
|
async with spotify.Client(
|
|
os.getenv("SPOTIFY_CLIENT_ID"),
|
|
os.getenv("SPOTIFY_CLIENT_SECRET")) as client:
|
|
playlists = await client.http.get_playlists(username,
|
|
limit=50,
|
|
offset=offset)
|
|
items = playlists["items"]
|
|
|
|
for item in items:
|
|
name: str = item["name"]
|
|
if not name.startswith(prefix):
|
|
continue
|
|
genre_name = name.removeprefix(prefix)
|
|
if genre_name in name_map:
|
|
g_id = name_map[genre_name]
|
|
metadata[g_id].playlists[str(year)] = item["id"]
|
|
print("found", genre_name)
|
|
|
|
if len(items) < 50:
|
|
break
|
|
offset += len(items)
|
|
|
|
write_meta(metadata)
|
|
|
|
|
|
def genre_tree(metadata: Dict[str, model.GenreMetadata]) -> dict:
|
|
|
|
def mkt(p: Optional[str]):
|
|
subtree = []
|
|
|
|
# find children
|
|
for genre_id, genre in metadata.items():
|
|
if not genre.alias and not genre.deprecated and genre.parent == p:
|
|
sg = model.GenreMetadataTree.conv(genre_id, genre)
|
|
sg.children = mkt(genre_id)
|
|
subtree.append(sg)
|
|
|
|
if len(subtree) == 0:
|
|
return None
|
|
subtree.sort()
|
|
return subtree
|
|
|
|
return mkt(None)
|
|
|
|
|
|
def package(out_dir: Path):
|
|
"""Package genre data for consumption"""
|
|
metadata = util.read_meta()
|
|
lang_dir = out_dir / "lang"
|
|
os.makedirs(lang_dir, exist_ok=True)
|
|
tree_dir = out_dir / "tree"
|
|
os.makedirs(tree_dir, exist_ok=True)
|
|
|
|
with open(util.TRANSLATION_FILE) as f:
|
|
tl_main = json.load(f)
|
|
min_keys = len(tl_main) * 0.7
|
|
|
|
with open(util.TRANSLATION_FILE_EN) as f:
|
|
tl_en = json.load(f)
|
|
|
|
# Remove redundant tags
|
|
def remove_redundant_tags(gid: str, parent: str):
|
|
genre = metadata[gid]
|
|
pg = metadata[parent]
|
|
if pg.language and pg.language == genre.language:
|
|
metadata[gid].language = None
|
|
if pg.country and pg.country == genre.country:
|
|
metadata[gid].country = None
|
|
if pg.region and pg.region == genre.region:
|
|
metadata[gid].region = None
|
|
if pg.parent:
|
|
remove_redundant_tags(gid, pg.parent)
|
|
|
|
for genre_id, genre in metadata.items():
|
|
if genre.parent:
|
|
remove_redundant_tags(genre_id, genre.parent)
|
|
|
|
# Genre database
|
|
db = {
|
|
g_id: model.GenreMetadataDB.conv(genre, tl_en.get(g_id))
|
|
for g_id, genre in metadata.items()
|
|
}
|
|
|
|
model.store_pack_json(out_dir / "genres.json", db, True)
|
|
|
|
# Filter other translations (if != English name)
|
|
for filename in os.listdir(util.TRANSLATION_DIR):
|
|
if filename == "_main.json" or filename == "en.json":
|
|
continue
|
|
tl_path = util.TRANSLATION_DIR / filename
|
|
with open(tl_path) as f:
|
|
tl = json.load(f)
|
|
|
|
if len(tl) < min_keys:
|
|
print("Skipping", filename)
|
|
continue
|
|
|
|
filtered_tl = {k: v for k, v in tl.items() if v != tl_en[k]}
|
|
|
|
with open(lang_dir / filename, "w") as f:
|
|
json.dump(filtered_tl, f)
|
|
|
|
# Genre trees (one for each language)
|
|
tree = genre_tree(metadata)
|
|
|
|
def tl_tree(tree: List[model.GenreMetadataTree], tl: Dict[str, str]):
|
|
for n in tree:
|
|
n.name = tl.get(n.id, tl_en[n.id])
|
|
if n.children:
|
|
tl_tree(n.children, tl)
|
|
|
|
for filename in os.listdir(util.TRANSLATION_DIR):
|
|
if filename == "_main.json":
|
|
continue
|
|
tl_path = util.TRANSLATION_DIR / filename
|
|
with open(tl_path) as f:
|
|
tl = json.load(f)
|
|
if len(tl) < min_keys:
|
|
continue
|
|
|
|
tl_tree(tree, tl)
|
|
model.store_pack_json(tree_dir / f"tree.{filename}", tree)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
load_dotenv()
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("op", type=str, help="Operation")
|
|
parser.add_argument("--limit",
|
|
type=int,
|
|
help="Limit",
|
|
default=None,
|
|
required=False)
|
|
parser.add_argument("--year",
|
|
type=int,
|
|
help="Year",
|
|
default=None,
|
|
required=False)
|
|
parser.add_argument("--file",
|
|
type=Path,
|
|
help="Limit",
|
|
default=None,
|
|
required=False)
|
|
parser.add_argument("--capitalize",
|
|
action='store_true',
|
|
help="Create capitalized translation")
|
|
args = parser.parse_args()
|
|
|
|
if args.op == "genres2meta":
|
|
genres_to_meta()
|
|
elif args.op == "validate":
|
|
validate()
|
|
elif args.op == "maketl":
|
|
make_translations()
|
|
elif args.op == "ls":
|
|
list_genres(args.limit)
|
|
elif args.op == "missingLang":
|
|
find_missing_lang()
|
|
elif args.op == "newtl":
|
|
new_translation(args.file, args.capitalize)
|
|
elif args.op == "checkln":
|
|
check_localized_name()
|
|
elif args.op == "scrapePlaylists":
|
|
asyncio.get_event_loop().run_until_complete(scrape_playlists())
|
|
elif args.op == "scrapePlaylistsYear":
|
|
asyncio.get_event_loop().run_until_complete(
|
|
scrape_playlists_year(args.year))
|
|
elif args.op == "package":
|
|
package(args.file)
|
|
else:
|
|
sys.exit(2)
|