Compare commits
2 commits
6fec47c197
...
4ff2d82bd3
Author | SHA1 | Date | |
---|---|---|---|
4ff2d82bd3 | |||
818777ce98 |
16 changed files with 332 additions and 320 deletions
15
Cargo.lock
generated
15
Cargo.lock
generated
|
@ -2313,6 +2313,19 @@ dependencies = [
|
|||
"syn 1.0.109",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "testbed"
|
||||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"dotenvy",
|
||||
"env_logger",
|
||||
"rustypipe",
|
||||
"sqlx",
|
||||
"tiraya-db",
|
||||
"tiraya-extractor",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "thiserror"
|
||||
version = "1.0.48"
|
||||
|
@ -2405,11 +2418,13 @@ version = "0.0.1"
|
|||
dependencies = [
|
||||
"env_logger",
|
||||
"futures",
|
||||
"hex-literal",
|
||||
"log",
|
||||
"once_cell",
|
||||
"quick_cache",
|
||||
"regex",
|
||||
"rustypipe",
|
||||
"siphasher 1.0.0",
|
||||
"sqlx",
|
||||
"sqlx-database-tester",
|
||||
"test-log",
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "with e as (\ninsert into albums (src_id, service, name, release_date, release_date_precision,\nalbum_type, ul_artists, by_va, image_url, image_hash, hidden, dirty, album_hash)\nvalues ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)\non conflict (src_id, service) do nothing\nreturning id) select * from e union select id from albums where src_id=$1 and service=$2",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
{
|
||||
"Custom": {
|
||||
"name": "music_service",
|
||||
"kind": {
|
||||
"Enum": [
|
||||
"yt",
|
||||
"ty",
|
||||
"sp",
|
||||
"mx"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"Text",
|
||||
"Date",
|
||||
{
|
||||
"Custom": {
|
||||
"name": "date_precision",
|
||||
"kind": {
|
||||
"Enum": [
|
||||
"year",
|
||||
"month",
|
||||
"day"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"Custom": {
|
||||
"name": "album_type",
|
||||
"kind": {
|
||||
"Enum": [
|
||||
"album",
|
||||
"single",
|
||||
"ep",
|
||||
"mv"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"TextArray",
|
||||
"Bool",
|
||||
"Text",
|
||||
"Bytea",
|
||||
"Bool",
|
||||
"Bool",
|
||||
"Bytea"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "20991105335209b04a47f4c2edb3590a5ebdaab114e62c4af568d77ca4d0d25a"
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "with e as (\ninsert into artists (src_id, service, name, description,\nimage_url, image_hash, header_image_url, header_image_hash, subscribers, wikipedia_url,\nrelated_artists, related_playlists, top_tracks)\nvalues ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)\non conflict (src_id, service) do nothing\nreturning id) select * from e union select id from artists where src_id=$1 and service=$2",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
{
|
||||
"Custom": {
|
||||
"name": "music_service",
|
||||
"kind": {
|
||||
"Enum": [
|
||||
"yt",
|
||||
"ty",
|
||||
"sp",
|
||||
"mx"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"Text",
|
||||
"Text",
|
||||
"Text",
|
||||
"Bytea",
|
||||
"Text",
|
||||
"Bytea",
|
||||
"Int8",
|
||||
"Text",
|
||||
"Int4Array",
|
||||
"Int4Array",
|
||||
"Int4Array"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "b54b2ed8556441b909f83fe3c1d640937405858aedc7f24a92a418af0e88c5c9"
|
||||
}
|
46
crates/db/.sqlx/query-e1a8dedae24461e7b34ca4542d3a66b0948502c300a47c2dfd83c86c1f63b875.json
generated
Normal file
46
crates/db/.sqlx/query-e1a8dedae24461e7b34ca4542d3a66b0948502c300a47c2dfd83c86c1f63b875.json
generated
Normal file
|
@ -0,0 +1,46 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "select b.id, b.src_id, b.service as \"service: _\" from albums b\njoin artists_albums arb on arb.album_id=b.id\nwhere arb.artist_id=$1 and b.dirty",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
},
|
||||
{
|
||||
"ordinal": 1,
|
||||
"name": "src_id",
|
||||
"type_info": "Text"
|
||||
},
|
||||
{
|
||||
"ordinal": 2,
|
||||
"name": "service: _",
|
||||
"type_info": {
|
||||
"Custom": {
|
||||
"name": "music_service",
|
||||
"kind": {
|
||||
"Enum": [
|
||||
"yt",
|
||||
"ty",
|
||||
"sp",
|
||||
"mx"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Int4"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
false,
|
||||
false,
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "e1a8dedae24461e7b34ca4542d3a66b0948502c300a47c2dfd83c86c1f63b875"
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "with e as (\ninsert into tracks (src_id, service, name, duration, duration_ms,\n size, loudness, album_id, album_pos, ul_artists, isrc, description, primary_track)\nvalues ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)\non conflict (src_id, service) do nothing\nreturning id) select * from e union select id from tracks where src_id=$1 and service=$2",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
"ordinal": 0,
|
||||
"name": "id",
|
||||
"type_info": "Int4"
|
||||
}
|
||||
],
|
||||
"parameters": {
|
||||
"Left": [
|
||||
"Text",
|
||||
{
|
||||
"Custom": {
|
||||
"name": "music_service",
|
||||
"kind": {
|
||||
"Enum": [
|
||||
"yt",
|
||||
"ty",
|
||||
"sp",
|
||||
"mx"
|
||||
]
|
||||
}
|
||||
}
|
||||
},
|
||||
"Text",
|
||||
"Int4",
|
||||
"Bool",
|
||||
"Int8",
|
||||
"Float4",
|
||||
"Int4",
|
||||
"Int2",
|
||||
"TextArray",
|
||||
"Varchar",
|
||||
"Text",
|
||||
"Bool"
|
||||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
]
|
||||
},
|
||||
"hash": "f1cc0ae7402ec734647fc82f4aa991bd07fdf68bbd352f30e0ff53722487d4cd"
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
{
|
||||
"db_name": "PostgreSQL",
|
||||
"query": "with e as (\ninsert into playlists (src_id, service, name, description,\n owner_name, owner_url, playlist_type, image_url, image_hash, image_type)\nvalues ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)\non conflict (src_id, service) do nothing\nreturning id) select * from e union select id from playlists where src_id=$1 and service=$2",
|
||||
"query": "insert into playlists (src_id, service, name, description,\nowner_name, owner_url, playlist_type, image_url, image_hash, image_type)\nvalues ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)\nreturning id",
|
||||
"describe": {
|
||||
"columns": [
|
||||
{
|
||||
|
@ -58,8 +58,8 @@
|
|||
]
|
||||
},
|
||||
"nullable": [
|
||||
null
|
||||
false
|
||||
]
|
||||
},
|
||||
"hash": "6a8bc66ee1ab3fa2bdcfdd4333324169d064de208bfd9e8d50be1796027a7ffc"
|
||||
"hash": "fe1ab8572dc12334b4a9402d3a2c4937a4629e623963686fb5e2545b72d3cb55"
|
||||
}
|
|
@ -358,36 +358,6 @@ order by b.release_date"#,
|
|||
}
|
||||
|
||||
impl AlbumNew<'_> {
|
||||
pub async fn insert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
{
|
||||
let res = sqlx::query!(
|
||||
r#"with e as (
|
||||
insert into albums (src_id, service, name, release_date, release_date_precision,
|
||||
album_type, ul_artists, by_va, image_url, image_hash, hidden, dirty, album_hash)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
||||
on conflict (src_id, service) do nothing
|
||||
returning id) select * from e union select id from albums where src_id=$1 and service=$2"#,
|
||||
self.src_id,
|
||||
self.service as MusicService,
|
||||
self.name,
|
||||
self.release_date,
|
||||
self.release_date_precision as Option<DatePrecision>,
|
||||
self.album_type as Option<AlbumType>,
|
||||
self.ul_artists.as_deref(),
|
||||
self.by_va,
|
||||
self.image_url,
|
||||
self.image_hash,
|
||||
self.hidden,
|
||||
self.dirty,
|
||||
self.album_hash,
|
||||
)
|
||||
.fetch_one(exec)
|
||||
.await?;
|
||||
res.id.ok_or(DatabaseError::Other("got no id".into()))
|
||||
}
|
||||
|
||||
pub async fn upsert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
|
@ -674,7 +644,7 @@ mod tests {
|
|||
|
||||
// Create
|
||||
let mut c_tx = pool.begin().await.unwrap();
|
||||
let new_id = album.insert(&mut *c_tx).await.unwrap();
|
||||
let new_id = album.upsert(&mut *c_tx).await.unwrap();
|
||||
let id = Id::Db(new_id);
|
||||
Album::set_artists(new_id, &album_artists, &mut c_tx)
|
||||
.await
|
||||
|
@ -761,7 +731,7 @@ mod tests {
|
|||
service: MusicService::YouTube,
|
||||
..Default::default()
|
||||
};
|
||||
let album_id = album.insert(&pool).await.unwrap();
|
||||
let album_id = album.upsert(&pool).await.unwrap();
|
||||
assert_eq!(album_id, ids::ALBUM_ID_VAKUUM);
|
||||
}
|
||||
|
||||
|
|
|
@ -4,8 +4,8 @@ use sqlx::{types::Json, QueryBuilder};
|
|||
use time::PrimitiveDateTime;
|
||||
|
||||
use super::{
|
||||
album::AlbumSlimRow, AlbumSlim, Id, IdOwned, MusicService, PlaylistSlim, SrcId, SrcIdOwned,
|
||||
SyncData, TrackSlim, TrackSlimRow, TrackTiny,
|
||||
album::AlbumSlimRow, AlbumSlim, Id, IdOwned, InternalId, MusicService, PlaylistSlim, SrcId,
|
||||
SrcIdOwned, SyncData, TrackSlim, TrackSlimRow, TrackTiny,
|
||||
};
|
||||
use crate::{
|
||||
error::{DatabaseError, OptionalRes},
|
||||
|
@ -531,40 +531,26 @@ where art.artist_id="#,
|
|||
.await
|
||||
.map_err(DatabaseError::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl ArtistNew<'_> {
|
||||
pub async fn insert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
/// Select the IDs of all albums from an artist that need to be fetched
|
||||
pub async fn dirty_album_ids<'a, E>(id: i32, exec: E) -> Result<Vec<InternalId>, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
{
|
||||
let res = sqlx::query!(
|
||||
r#"with e as (
|
||||
insert into artists (src_id, service, name, description,
|
||||
image_url, image_hash, header_image_url, header_image_hash, subscribers, wikipedia_url,
|
||||
related_artists, related_playlists, top_tracks)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
||||
on conflict (src_id, service) do nothing
|
||||
returning id) select * from e union select id from artists where src_id=$1 and service=$2"#,
|
||||
self.src_id,
|
||||
self.service as MusicService,
|
||||
self.name,
|
||||
self.description,
|
||||
self.image_url,
|
||||
self.image_hash,
|
||||
self.header_image_url,
|
||||
self.header_image_hash,
|
||||
self.subscribers,
|
||||
self.wikipedia_url,
|
||||
self.related_artists.as_deref(),
|
||||
self.related_playlists.as_deref(),
|
||||
self.top_tracks.as_deref(),
|
||||
sqlx::query_as!(
|
||||
InternalId,
|
||||
r#"select b.id, b.src_id, b.service as "service: _" from albums b
|
||||
join artists_albums arb on arb.album_id=b.id
|
||||
where arb.artist_id=$1 and b.dirty"#,
|
||||
id
|
||||
)
|
||||
.fetch_one(exec)
|
||||
.await?;
|
||||
res.id.ok_or(DatabaseError::Other("got no id".into()))
|
||||
.fetch_all(exec)
|
||||
.await
|
||||
.map_err(DatabaseError::from)
|
||||
}
|
||||
}
|
||||
|
||||
impl ArtistNew<'_> {
|
||||
pub async fn upsert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
|
@ -830,7 +816,7 @@ mod tests {
|
|||
};
|
||||
|
||||
// Create
|
||||
let new_id = artist.insert(&pool).await.unwrap();
|
||||
let new_id = artist.upsert(&pool).await.unwrap();
|
||||
let id = Id::Db(new_id);
|
||||
|
||||
// Request
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::fmt::Write;
|
||||
|
||||
use serde::{de::Visitor, Deserialize, Serialize};
|
||||
use sqlx::{types::Json, Row};
|
||||
use sqlx::{types::Json, FromRow, Row};
|
||||
|
||||
mod album;
|
||||
mod artist;
|
||||
|
@ -46,6 +46,13 @@ pub struct SrcId<'a>(pub &'a str, pub MusicService);
|
|||
#[derive(Clone, PartialEq, Eq, Hash)]
|
||||
pub struct SrcIdOwned(pub String, pub MusicService);
|
||||
|
||||
#[derive(Debug, Serialize, FromRow)]
|
||||
pub struct InternalId {
|
||||
pub id: i32,
|
||||
pub src_id: String,
|
||||
pub service: MusicService,
|
||||
}
|
||||
|
||||
impl Id<'_> {
|
||||
pub fn to_owned(&self) -> IdOwned {
|
||||
match self {
|
||||
|
|
|
@ -473,12 +473,10 @@ impl PlaylistNew<'_> {
|
|||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
{
|
||||
let res = sqlx::query!(
|
||||
r#"with e as (
|
||||
insert into playlists (src_id, service, name, description,
|
||||
owner_name, owner_url, playlist_type, image_url, image_hash, image_type)
|
||||
r#"insert into playlists (src_id, service, name, description,
|
||||
owner_name, owner_url, playlist_type, image_url, image_hash, image_type)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
||||
on conflict (src_id, service) do nothing
|
||||
returning id) select * from e union select id from playlists where src_id=$1 and service=$2"#,
|
||||
returning id"#,
|
||||
self.src_id,
|
||||
self.service as MusicService,
|
||||
self.name,
|
||||
|
@ -492,7 +490,7 @@ returning id) select * from e union select id from playlists where src_id=$1 and
|
|||
)
|
||||
.fetch_one(exec)
|
||||
.await?;
|
||||
res.id.ok_or(DatabaseError::Other("got no id".into()))
|
||||
Ok(res.id)
|
||||
}
|
||||
|
||||
pub async fn upsert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
|
|
|
@ -347,36 +347,6 @@ on conflict (src_id, service) do update set track_id=excluded.track_id"#,
|
|||
}
|
||||
|
||||
impl TrackNew<'_> {
|
||||
pub async fn insert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
{
|
||||
let res = sqlx::query!(
|
||||
r#"with e as (
|
||||
insert into tracks (src_id, service, name, duration, duration_ms,
|
||||
size, loudness, album_id, album_pos, ul_artists, isrc, description, primary_track)
|
||||
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
|
||||
on conflict (src_id, service) do nothing
|
||||
returning id) select * from e union select id from tracks where src_id=$1 and service=$2"#,
|
||||
self.src_id,
|
||||
self.service as MusicService,
|
||||
self.name,
|
||||
self.duration,
|
||||
self.duration_ms,
|
||||
self.size,
|
||||
self.loudness,
|
||||
self.album_id,
|
||||
self.album_pos,
|
||||
self.ul_artists.as_deref(),
|
||||
self.isrc,
|
||||
self.description,
|
||||
self.primary_track,
|
||||
)
|
||||
.fetch_one(exec)
|
||||
.await?;
|
||||
res.id.ok_or(DatabaseError::Other("got no id".into()))
|
||||
}
|
||||
|
||||
pub async fn upsert<'a, E>(&self, exec: E) -> Result<i32, DatabaseError>
|
||||
where
|
||||
E: sqlx::Executor<'a, Database = sqlx::Postgres>,
|
||||
|
@ -699,7 +669,7 @@ mod tests {
|
|||
|
||||
// Create
|
||||
let mut c_tx = pool.begin().await.unwrap();
|
||||
let new_id = track.insert(&mut *c_tx).await.unwrap();
|
||||
let new_id = track.upsert(&mut *c_tx).await.unwrap();
|
||||
let id = Id::Db(new_id);
|
||||
Track::set_artists(new_id, &track_artists, &mut c_tx)
|
||||
.await
|
||||
|
|
|
@ -17,6 +17,8 @@ once_cell.workspace = true
|
|||
regex.workspace = true
|
||||
log.workspace = true
|
||||
quick_cache.workspace = true
|
||||
siphasher.workspace = true
|
||||
hex-literal.workspace = true
|
||||
|
||||
tiraya-db.workspace = true
|
||||
|
||||
|
|
|
@ -3,12 +3,14 @@
|
|||
pub mod error;
|
||||
mod util;
|
||||
|
||||
use std::{borrow::Cow, sync::Arc};
|
||||
use std::{borrow::Cow, hash::Hasher, sync::Arc};
|
||||
|
||||
use error::ExtractorError;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use hex_literal::hex;
|
||||
use quick_cache::sync::Cache;
|
||||
use rustypipe::client::RustyPipe;
|
||||
use siphasher::sip128::{Hasher128, SipHasher};
|
||||
use sqlx::{Pool, Postgres};
|
||||
use time::{Date, Duration, OffsetDateTime};
|
||||
use tiraya_db::{
|
||||
|
@ -40,6 +42,26 @@ struct SyncLastUpdate {
|
|||
state: LastUpdateState,
|
||||
}
|
||||
|
||||
pub struct GetResult<T> {
|
||||
pub c: T,
|
||||
pub fetched: bool,
|
||||
}
|
||||
|
||||
impl<T> GetResult<T> {
|
||||
const fn fetched(d: T) -> Self {
|
||||
Self {
|
||||
c: d,
|
||||
fetched: true,
|
||||
}
|
||||
}
|
||||
const fn stored(d: T) -> Self {
|
||||
Self {
|
||||
c: d,
|
||||
fetched: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
enum LastUpdateState {
|
||||
Empty,
|
||||
|
@ -69,7 +91,7 @@ impl Extractor {
|
|||
)
|
||||
}
|
||||
|
||||
pub async fn get_artist(&self, id: SrcId<'_>) -> Result<Artist, ExtractorError> {
|
||||
pub async fn get_artist(&self, id: SrcId<'_>) -> Result<GetResult<Artist>, ExtractorError> {
|
||||
let artist = Artist::get(id.id(), &self.db).await.to_optional().unwrap();
|
||||
let last_update = if let Some(artist) = artist {
|
||||
self.artist_cache.insert(id.to_owned(), artist.id);
|
||||
|
@ -81,7 +103,7 @@ impl Extractor {
|
|||
|
||||
// Check if artist in DB is still fresh
|
||||
if age < ARTIST_STALE {
|
||||
return Ok(artist);
|
||||
return Ok(GetResult::stored(artist));
|
||||
} else {
|
||||
match last_sync_data {
|
||||
SyncData::Version(version) => Some(SyncLastUpdate {
|
||||
|
@ -100,7 +122,7 @@ impl Extractor {
|
|||
if typ.retry(age) {
|
||||
None
|
||||
} else {
|
||||
return Ok(artist);
|
||||
return Ok(GetResult::stored(artist));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -125,10 +147,14 @@ impl Extractor {
|
|||
&self,
|
||||
id: &str,
|
||||
last_update: &Option<SyncLastUpdate>,
|
||||
) -> Result<Artist, ExtractorError> {
|
||||
) -> Result<GetResult<Artist>, ExtractorError> {
|
||||
match self._update_yt_artist(id, last_update).await {
|
||||
Ok(id) => Artist::get(Id::Db(id), &self.db)
|
||||
Ok(res) => Artist::get(Id::Db(res.c), &self.db)
|
||||
.await
|
||||
.map(|a| GetResult {
|
||||
c: a,
|
||||
fetched: res.fetched,
|
||||
})
|
||||
.map_err(ExtractorError::from),
|
||||
Err(e) => match e {
|
||||
ExtractorError::RustyPipe(e) => {
|
||||
|
@ -149,7 +175,7 @@ impl Extractor {
|
|||
&self,
|
||||
original_id: &str,
|
||||
last_update: &Option<SyncLastUpdate>,
|
||||
) -> Result<i32, ExtractorError> {
|
||||
) -> Result<GetResult<i32>, ExtractorError> {
|
||||
let mut this_update_state = None;
|
||||
|
||||
// Since the ID may change through redirects, we only fetch the feed at this
|
||||
|
@ -158,7 +184,7 @@ impl Extractor {
|
|||
let us = self.yt_artist_update_state(original_id).await?;
|
||||
if us == last_update.state {
|
||||
Artist::set_last_sync(last_update.id, us.into(), &self.db).await?;
|
||||
return Ok(last_update.id);
|
||||
return Ok(GetResult::stored(last_update.id));
|
||||
}
|
||||
this_update_state = Some(us);
|
||||
}
|
||||
|
@ -167,7 +193,7 @@ impl Extractor {
|
|||
|
||||
let top_track_ids =
|
||||
futures::stream::iter(artist.tracks.into_iter().filter(|t| !t.is_video))
|
||||
.map(|track| async move { self.import_yt_track(track).await })
|
||||
.map(|track| async move { self.import_yt_track(track, None).await })
|
||||
.buffered(CONCURRENCY)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
@ -262,7 +288,36 @@ impl Extractor {
|
|||
};
|
||||
Artist::set_last_sync(artist_id, this_update_state.into(), &self.db).await?;
|
||||
|
||||
Ok(artist_id)
|
||||
Ok(GetResult::fetched(artist_id))
|
||||
}
|
||||
|
||||
pub async fn update_yt_artist_albums(&self, id: i32) -> Result<(), ExtractorError> {
|
||||
let dirty_albums = Artist::dirty_album_ids(id, &self.db).await?;
|
||||
let more_albums = futures::stream::iter(dirty_albums)
|
||||
.map(|id| async move {
|
||||
match self.import_yt_album(&id.src_id).await {
|
||||
Ok(more) => more,
|
||||
Err(e) => {
|
||||
log::error!("could not import album [yt:{}]: {}", id.src_id, e);
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
})
|
||||
.buffer_unordered(DB_CONCURRENCY)
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
|
||||
futures::stream::iter(more_albums.into_iter().flatten())
|
||||
.for_each_concurrent(DB_CONCURRENCY, |album| async move {
|
||||
match self.import_yt_album(&album.id).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
log::error!("could not import album [yt:{}]: {}", album.id, e);
|
||||
}
|
||||
}
|
||||
})
|
||||
.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn yt_artist_update_state(&self, id: &str) -> Result<LastUpdateState, ExtractorError> {
|
||||
|
@ -276,43 +331,55 @@ impl Extractor {
|
|||
}
|
||||
|
||||
/// Import a single track from YouTube
|
||||
async fn import_yt_track(&self, track: rpmodel::TrackItem) -> Result<i32, ExtractorError> {
|
||||
if let Some(id) = Track::get_id(SrcId(&track.id, MusicService::YouTube), &self.db).await? {
|
||||
return Ok(id);
|
||||
async fn import_yt_track(
|
||||
&self,
|
||||
track: rpmodel::TrackItem,
|
||||
album_id: Option<i32>,
|
||||
) -> Result<i32, ExtractorError> {
|
||||
if album_id.is_none() {
|
||||
if let Some(id) =
|
||||
Track::get_id(SrcId(&track.id, MusicService::YouTube), &self.db).await?
|
||||
{
|
||||
return Ok(id);
|
||||
}
|
||||
}
|
||||
|
||||
let (artists, ul_artists) = util::split_yt_artists(track.artists);
|
||||
let image_url = util::get_image_url(&track.cover, false);
|
||||
|
||||
let album_id = if track.is_video {
|
||||
// Create a pseudo-album for music videos
|
||||
let n_album = AlbumNew {
|
||||
src_id: &track.id,
|
||||
service: MusicService::YouTube,
|
||||
name: &track.name,
|
||||
release_date: None,
|
||||
release_date_precision: None,
|
||||
album_type: Some(AlbumType::Mv),
|
||||
by_va: track.by_va,
|
||||
image_url: image_url.as_deref(),
|
||||
dirty: false,
|
||||
..Default::default()
|
||||
};
|
||||
n_album.insert(&self.db).await?
|
||||
} else if let Some(t_album) = track.album {
|
||||
let n_album = AlbumNew {
|
||||
src_id: &t_album.id,
|
||||
service: MusicService::YouTube,
|
||||
name: &t_album.name,
|
||||
image_url: image_url.as_deref(),
|
||||
..Default::default()
|
||||
};
|
||||
n_album.insert(&self.db).await?
|
||||
} else {
|
||||
return Err(ExtractorError::InvalidData {
|
||||
id: SrcIdOwned(track.id, MusicService::YouTube),
|
||||
msg: "unknown album".into(),
|
||||
});
|
||||
let album_id = match album_id {
|
||||
Some(album_id) => album_id,
|
||||
None => {
|
||||
let image_url = util::get_image_url(&track.cover, false);
|
||||
|
||||
if track.is_video {
|
||||
// Create a pseudo-album for music videos
|
||||
let n_album = AlbumNew {
|
||||
src_id: &track.id,
|
||||
service: MusicService::YouTube,
|
||||
name: &track.name,
|
||||
album_type: Some(AlbumType::Mv),
|
||||
by_va: track.by_va,
|
||||
image_url: image_url.as_deref(),
|
||||
dirty: false,
|
||||
..Default::default()
|
||||
};
|
||||
n_album.upsert(&self.db).await?
|
||||
} else if let Some(t_album) = track.album {
|
||||
let n_album = AlbumNew {
|
||||
src_id: &t_album.id,
|
||||
service: MusicService::YouTube,
|
||||
name: &t_album.name,
|
||||
image_url: image_url.as_deref(),
|
||||
..Default::default()
|
||||
};
|
||||
n_album.upsert(&self.db).await?
|
||||
} else {
|
||||
return Err(ExtractorError::InvalidData {
|
||||
id: SrcIdOwned(track.id, MusicService::YouTube),
|
||||
msg: "unknown album".into(),
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let artist_ids = self.import_yt_artist_ids(&artists).await?;
|
||||
|
@ -321,7 +388,7 @@ impl Extractor {
|
|||
src_id: &track.id,
|
||||
service: MusicService::YouTube,
|
||||
name: &track.name,
|
||||
duration: track.duration.and_then(|v| v.try_into().ok()),
|
||||
duration: track.duration.and_then(|v| (v * 1000).try_into().ok()),
|
||||
duration_ms: false,
|
||||
album_id,
|
||||
album_pos: track.track_nr.and_then(|v| v.try_into().ok()),
|
||||
|
@ -329,7 +396,7 @@ impl Extractor {
|
|||
..Default::default()
|
||||
};
|
||||
let mut tx = self.db.begin().await?;
|
||||
let track_id = n_track.insert(&mut *tx).await?;
|
||||
let track_id = n_track.upsert(&mut *tx).await?;
|
||||
Track::set_artists(track_id, &artist_ids, &mut tx).await?;
|
||||
tx.commit().await?;
|
||||
|
||||
|
@ -358,7 +425,10 @@ impl Extractor {
|
|||
name: &aid.name,
|
||||
..Default::default()
|
||||
};
|
||||
artist.insert(&self.db).await.map_err(ExtractorError::from)
|
||||
artist
|
||||
.upsert_recessive(&self.db)
|
||||
.await
|
||||
.map_err(ExtractorError::from)
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
@ -405,11 +475,7 @@ impl Extractor {
|
|||
.year
|
||||
.and_then(|y| Date::from_calendar_date(y.into(), time::Month::January, 1).ok()),
|
||||
release_date_precision: Some(DatePrecision::Year),
|
||||
album_type: Some(match album.album_type {
|
||||
rpmodel::AlbumType::Ep => AlbumType::Ep,
|
||||
rpmodel::AlbumType::Single => AlbumType::Single,
|
||||
_ => AlbumType::Album,
|
||||
}),
|
||||
album_type: Some(util::map_album_type(album.album_type)),
|
||||
ul_artists: ul_artists.as_deref(),
|
||||
by_va: album.by_va,
|
||||
image_url: image_url.as_deref(),
|
||||
|
@ -454,10 +520,62 @@ impl Extractor {
|
|||
..Default::default()
|
||||
};
|
||||
playlist_n
|
||||
.insert(&self.db)
|
||||
.upsert(&self.db)
|
||||
.await
|
||||
.map_err(ExtractorError::from)
|
||||
}
|
||||
|
||||
async fn import_yt_album(&self, id: &str) -> Result<Vec<rpmodel::AlbumItem>, ExtractorError> {
|
||||
let album = self.rp.query().music_album(id).await?;
|
||||
|
||||
let (artists, ul_artists) = util::split_yt_artists(album.artists);
|
||||
let image_url = util::get_image_url(&album.cover, false);
|
||||
|
||||
let artist_ids = self.import_yt_artist_ids(&artists).await?;
|
||||
|
||||
// Get album hash
|
||||
let mut hasher = SipHasher::new_with_key(&hex!("e0060fd1ea207d8f43d2bf9bcae63f65"));
|
||||
for track in &album.tracks {
|
||||
hasher.write(track.name.as_bytes());
|
||||
hasher.write_u32(track.duration.unwrap_or_default());
|
||||
}
|
||||
let album_hash = hasher.finish128().as_bytes();
|
||||
|
||||
// Insert album
|
||||
// TODO: accurate release date
|
||||
let album_n = AlbumNew {
|
||||
src_id: &album.id,
|
||||
service: MusicService::YouTube,
|
||||
name: &album.name,
|
||||
release_date: album
|
||||
.year
|
||||
.and_then(|y| Date::from_calendar_date(y.into(), time::Month::January, 1).ok()),
|
||||
release_date_precision: Some(DatePrecision::Year),
|
||||
album_type: Some(util::map_album_type(album.album_type)),
|
||||
ul_artists: ul_artists.as_deref(),
|
||||
by_va: album.by_va,
|
||||
image_url: image_url.as_deref(),
|
||||
album_hash: Some(&album_hash),
|
||||
..Default::default()
|
||||
};
|
||||
let mut tx = self.db.begin().await?;
|
||||
let album_id = album_n.upsert(&mut *tx).await?;
|
||||
Album::set_artists(album_id, &artist_ids, &mut tx).await?;
|
||||
tx.commit().await?;
|
||||
|
||||
// Insert tracks
|
||||
futures::stream::iter(album.tracks.into_iter().map(Ok::<_, ExtractorError>))
|
||||
.try_for_each_concurrent(DB_CONCURRENCY, |track| async move {
|
||||
self.import_yt_track(track, Some(album_id)).await?;
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Mark album clean
|
||||
Album::mark_dirty(album_id, false, &self.db).await?;
|
||||
|
||||
Ok(album.variants)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for Extractor {
|
||||
|
@ -475,16 +593,22 @@ mod tests {
|
|||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
#[test_log::test]
|
||||
// #[test_log::test]
|
||||
async fn import_album() {
|
||||
sqlx_database_tester::dotenv::dotenv().unwrap();
|
||||
let url = std::env::var("DATABASE_URL").unwrap();
|
||||
let pool = PgPool::connect(&url).await.unwrap();
|
||||
|
||||
let extractor = Extractor::new(RustyPipe::new(), pool);
|
||||
extractor
|
||||
._update_yt_artist("UCOmHUn--16B90oW2L6FRR3A", &None)
|
||||
let artist_res = extractor
|
||||
.get_artist(SrcId("UCOmHUn--16B90oW2L6FRR3A", MusicService::YouTube))
|
||||
.await
|
||||
.unwrap();
|
||||
if artist_res.fetched {
|
||||
extractor
|
||||
.update_yt_artist_albums(artist_res.c.id)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::borrow::Cow;
|
|||
use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use rustypipe::model as rpmodel;
|
||||
use tiraya_db::models::{SyncData, SyncError};
|
||||
use tiraya_db::models::{AlbumType, SyncData, SyncError};
|
||||
|
||||
pub struct ArtistIdName {
|
||||
pub id: String,
|
||||
|
@ -79,6 +79,14 @@ pub fn rp_error_to_sync_data(error: &rustypipe::error::Error) -> Option<SyncData
|
|||
}
|
||||
}
|
||||
|
||||
pub fn map_album_type(album_type: rpmodel::AlbumType) -> AlbumType {
|
||||
match album_type {
|
||||
rpmodel::AlbumType::Ep => AlbumType::Ep,
|
||||
rpmodel::AlbumType::Single => AlbumType::Single,
|
||||
_ => AlbumType::Album,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
|
20
crates/testbed/Cargo.toml
Normal file
20
crates/testbed/Cargo.toml
Normal file
|
@ -0,0 +1,20 @@
|
|||
[package]
|
||||
name = "testbed"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
authors.workspace = true
|
||||
license.workspace = true
|
||||
description.workspace = true
|
||||
repository.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tiraya-extractor.workspace = true
|
||||
tiraya-db.workspace = true
|
||||
|
||||
rustypipe.workspace = true
|
||||
sqlx.workspace = true
|
||||
dotenvy.workspace = true
|
||||
tokio.workspace = true
|
||||
env_logger.workspace = true
|
27
crates/testbed/src/main.rs
Normal file
27
crates/testbed/src/main.rs
Normal file
|
@ -0,0 +1,27 @@
|
|||
use rustypipe::client::RustyPipe;
|
||||
use sqlx::PgPool;
|
||||
use tiraya_db::models::{MusicService, SrcId};
|
||||
use tiraya_extractor::Extractor;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
dotenvy::dotenv().unwrap();
|
||||
env_logger::init();
|
||||
|
||||
let url = std::env::var("DATABASE_URL").unwrap();
|
||||
let pool = PgPool::connect(&url).await.unwrap();
|
||||
let ext = Extractor::new(RustyPipe::new(), pool);
|
||||
|
||||
let mut args = std::env::args().skip(1);
|
||||
let a1 = args.next().expect("argument");
|
||||
|
||||
let artist_res = ext
|
||||
.get_artist(SrcId(&a1, MusicService::YouTube))
|
||||
.await
|
||||
.unwrap();
|
||||
if artist_res.fetched {
|
||||
ext.update_yt_artist_albums(artist_res.c.id).await.unwrap();
|
||||
}
|
||||
|
||||
dbg!(artist_res.c);
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue