You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1238 lines
75 KiB
1238 lines
75 KiB
import os, re
|
|
from datetime import datetime, timedelta, timezone
|
|
from modules import plex, util, anidb
|
|
from modules.util import Failed, LimitReached
|
|
from plexapi.exceptions import NotFound
|
|
from plexapi.video import Movie, Show
|
|
|
|
logger = util.logger
|
|
|
|
meta_operations = [
|
|
"mass_audience_rating_update", "mass_user_rating_update", "mass_critic_rating_update",
|
|
"mass_episode_audience_rating_update", "mass_episode_user_rating_update", "mass_episode_critic_rating_update",
|
|
"mass_genre_update", "mass_content_rating_update", "mass_originally_available_update", "mass_added_at_update",
|
|
"mass_original_title_update", "mass_poster_update", "mass_background_update", "mass_studio_update"
|
|
]
|
|
name_display = {
|
|
"audienceRating": "Audience Rating",
|
|
"rating": "Critic Rating",
|
|
"userRating": "User Rating",
|
|
"originallyAvailableAt": "Originally Available Date",
|
|
"addedAt": "Added At Date",
|
|
"contentRating": "Content Rating"
|
|
}
|
|
|
|
class Operations:
|
|
def __init__(self, config, library):
|
|
self.config = config
|
|
self.library = library
|
|
|
|
def run_operations(self):
|
|
operation_start = datetime.now()
|
|
logger.info("")
|
|
logger.separator(f"{self.library.name} Library Operations")
|
|
logger.info("")
|
|
logger.debug(f"Assets For All: {self.library.assets_for_all}")
|
|
logger.debug(f"Delete Collections: {self.library.delete_collections}")
|
|
logger.debug(f"Show Unmanaged Collections: {self.library.show_unmanaged}")
|
|
logger.debug(f"Show Unconfigured Collections: {self.library.show_unconfigured}")
|
|
logger.debug(f"Mass Genre Update: {self.library.mass_genre_update}")
|
|
logger.debug(f"Mass Audience Rating Update: {self.library.mass_audience_rating_update}")
|
|
logger.debug(f"Mass Critic Rating Update: {self.library.mass_critic_rating_update}")
|
|
logger.debug(f"Mass User Rating Update: {self.library.mass_user_rating_update}")
|
|
logger.debug(f"Mass Episode Audience Rating Update: {self.library.mass_episode_audience_rating_update}")
|
|
logger.debug(f"Mass Episode Critic Rating Update: {self.library.mass_episode_critic_rating_update}")
|
|
logger.debug(f"Mass Episode User Rating Update: {self.library.mass_episode_user_rating_update}")
|
|
logger.debug(f"Mass Content Rating Update: {self.library.mass_content_rating_update}")
|
|
logger.debug(f"Mass Original Title Update: {self.library.mass_original_title_update}")
|
|
logger.debug(f"Mass Originally Available Update: {self.library.mass_originally_available_update}")
|
|
logger.debug(f"Mass Added At Update: {self.library.mass_added_at_update}")
|
|
logger.debug(f"Mass IMDb Parental Labels: {self.library.mass_imdb_parental_labels}")
|
|
logger.debug(f"Mass Poster Update: {self.library.mass_poster_update}")
|
|
logger.debug(f"Mass Background Update: {self.library.mass_background_update}")
|
|
logger.debug(f"Mass Collection Mode Update: {self.library.mass_collection_mode}")
|
|
logger.debug(f"Split Duplicates: {self.library.split_duplicates}")
|
|
logger.debug(f"Radarr Add All Existing: {self.library.radarr_add_all_existing}")
|
|
logger.debug(f"Radarr Remove by Tag: {self.library.radarr_remove_by_tag}")
|
|
logger.debug(f"Sonarr Add All Existing: {self.library.sonarr_add_all_existing}")
|
|
logger.debug(f"Sonarr Remove by Tag: {self.library.sonarr_remove_by_tag}")
|
|
logger.debug(f"Update Blank Track Titles: {self.library.update_blank_track_titles}")
|
|
logger.debug(f"Update Remove Title Parentheses: {self.library.remove_title_parentheses}")
|
|
logger.debug(f"Genre Mapper: {self.library.genre_mapper}")
|
|
logger.debug(f"Content Rating Mapper: {self.library.content_rating_mapper}")
|
|
logger.debug(f"Metadata Backup: {self.library.metadata_backup}")
|
|
logger.debug(f"Item Operation: {self.library.items_library_operation}")
|
|
logger.debug("")
|
|
|
|
def should_be_deleted(col_in, labels_in, configured_in, managed_in, less_in, ignore_smart_in):
|
|
if all((x is None for x in [configured_in, managed_in, less_in])):
|
|
return False
|
|
|
|
less_check = not ignore_smart_in if col_in.smart else True
|
|
if less_in is not None:
|
|
if less_check:
|
|
col_count = col_in.childCount if col_in.childCount is not None else 0
|
|
less_check = col_count < less_in
|
|
logger.trace(f"{col_in.title} - collection size: {col_count} < less: {less_in}, DELETE: {less_check}")
|
|
else:
|
|
logger.trace(f"{col_in.title} - skipping size check: smart - {col_in.smart}, ignore_smart - {ignore_smart_in}")
|
|
|
|
|
|
managed_check = True
|
|
if managed_in is not None:
|
|
is_managed = "PMM" in labels_in or "Kometa" in labels_in
|
|
managed_check = managed_in == is_managed
|
|
logger.trace(f"{col_in.title} - collection managed: {is_managed} vs managed: {managed_in}, DELETE: {managed_check}")
|
|
|
|
configured_check = True
|
|
if configured_in is not None:
|
|
is_configured = col_in.title in self.library.collections
|
|
configured_check = configured_in == is_configured
|
|
logger.trace(f"{col_in.title} - collection configured: {is_configured} vs configured: {configured_in}, DELETE: {configured_check}")
|
|
|
|
return all((less_check, managed_check, configured_check))
|
|
|
|
if self.library.split_duplicates:
|
|
items = self.library.search(**{"duplicate": True})
|
|
for item in items:
|
|
item.split()
|
|
logger.info(f"{item.title[:25]:<25} | Splitting")
|
|
|
|
if self.library.update_blank_track_titles:
|
|
tracks = self.library.get_all(builder_level="track")
|
|
num_edited = 0
|
|
for i, track in enumerate(tracks, 1):
|
|
logger.ghost(f"Processing Track: {i}/{len(tracks)} {track.title}")
|
|
if not track.title and track.titleSort:
|
|
track.editTitle(track.titleSort)
|
|
num_edited += 1
|
|
logger.info(f"Track: {track.titleSort} was updated with sort title")
|
|
logger.info(f"{len(tracks)} Tracks Processed; {num_edited} Blank Track Titles Updated")
|
|
|
|
if self.library.items_library_operation:
|
|
items = self.library.get_all()
|
|
radarr_adds = []
|
|
sonarr_adds = []
|
|
label_edits = {"add": {}, "remove": {}}
|
|
rating_edits = {"audienceRating": {}, "rating": {}, "userRating": {}}
|
|
genre_edits = {"add": {}, "remove": {}}
|
|
content_edits = {}
|
|
studio_edits = {}
|
|
date_edits = {"originallyAvailableAt": {}, "addedAt": {}}
|
|
remove_edits = {}
|
|
reset_edits = {}
|
|
lock_edits = {}
|
|
unlock_edits = {}
|
|
ep_rating_edits = {"audienceRating": {}, "rating": {}, "userRating": {}}
|
|
ep_remove_edits = {}
|
|
ep_reset_edits = {}
|
|
ep_lock_edits = {}
|
|
ep_unlock_edits = {}
|
|
|
|
if self.library.assets_for_all and not self.library.asset_directory:
|
|
logger.error("Asset Error: No Asset Directory for Assets For All")
|
|
|
|
for i, item in enumerate(items, 1):
|
|
logger.info("")
|
|
logger.info(f"Processing: {i}/{len(items)} {item.title}")
|
|
try:
|
|
item = self.library.reload(item)
|
|
except Failed as e:
|
|
logger.error(e)
|
|
continue
|
|
current_labels = [la.tag for la in self.library.item_labels(item)] if self.library.label_operations else []
|
|
|
|
if self.library.assets_for_all and self.library.asset_directory:
|
|
self.library.find_and_upload_assets(item, current_labels)
|
|
|
|
locked_fields = [f.name for f in item.fields if f.locked]
|
|
|
|
tmdb_id, tvdb_id, imdb_id = self.library.get_ids(item)
|
|
|
|
item_edits = ""
|
|
|
|
if self.library.remove_title_parentheses:
|
|
if not any([f.name == "title" and f.locked for f in item.fields]) and item.title.endswith(")"):
|
|
new_title = re.sub(" \\(\\w+\\)$", "", item.title)
|
|
item.editTitle(new_title)
|
|
item_edits += f"\nUpdated Title: {item.title[:25]:<25} | {new_title}"
|
|
|
|
if self.library.mass_imdb_parental_labels:
|
|
try:
|
|
if self.library.mass_imdb_parental_labels == "remove":
|
|
parental_labels = []
|
|
else:
|
|
parental_guide = self.config.IMDb.parental_guide(imdb_id)
|
|
parental_labels = [f"{k}:{v}" for k, v in parental_guide.items() if v and v not in util.parental_levels[self.library.mass_imdb_parental_labels]]
|
|
add_labels = [la for la in parental_labels if la not in current_labels]
|
|
remove_labels = [la for la in current_labels if la in util.parental_labels and la not in parental_labels]
|
|
for label_list, edit_type in [(add_labels, "add"), (remove_labels, "remove")]:
|
|
if label_list:
|
|
for label in label_list:
|
|
if label not in label_edits[edit_type]:
|
|
label_edits[edit_type][label] = []
|
|
label_edits[edit_type][label].append(item.ratingKey)
|
|
item_edits += f"\n{edit_type.capitalize()} IMDb Parental Labels (Batched) | {', '.join(label_list)}"
|
|
except Failed:
|
|
pass
|
|
if item.locations:
|
|
path = os.path.dirname(str(item.locations[0])) if self.library.is_movie else str(item.locations[0])
|
|
if self.library.Radarr and self.library.radarr_add_all_existing and tmdb_id:
|
|
path = path.replace(self.library.Radarr.plex_path, self.library.Radarr.radarr_path)
|
|
path = path[:-1] if path.endswith(('/', '\\')) else path
|
|
radarr_adds.append((tmdb_id, path))
|
|
if self.library.Sonarr and self.library.sonarr_add_all_existing and tvdb_id:
|
|
path = path.replace(self.library.Sonarr.plex_path, self.library.Sonarr.sonarr_path)
|
|
path = path[:-1] if path.endswith(("/", "\\")) else path
|
|
sonarr_adds.append((tvdb_id, path))
|
|
|
|
_trakt_ratings = None
|
|
def trakt_ratings():
|
|
nonlocal _trakt_ratings
|
|
if _trakt_ratings is None:
|
|
_trakt_ratings = self.config.Trakt.user_ratings(self.library.is_movie)
|
|
if not _trakt_ratings:
|
|
raise Failed
|
|
return _trakt_ratings
|
|
|
|
_tmdb_obj = None
|
|
def tmdb_obj():
|
|
nonlocal _tmdb_obj
|
|
if _tmdb_obj is None:
|
|
_tmdb_obj = False
|
|
try:
|
|
_item = self.config.TMDb.get_item(item, tmdb_id, tvdb_id, imdb_id, is_movie=self.library.is_movie)
|
|
if _item:
|
|
_tmdb_obj = _item
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
if not _tmdb_obj:
|
|
raise Failed
|
|
return _tmdb_obj
|
|
|
|
_omdb_obj = None
|
|
def omdb_obj():
|
|
nonlocal _omdb_obj
|
|
if _omdb_obj is None:
|
|
_omdb_obj = False
|
|
if self.config.OMDb.limit is not False:
|
|
logger.error("Daily OMDb Limit Reached")
|
|
elif not imdb_id:
|
|
logger.info(f"No IMDb ID for Guid: {item.guid}")
|
|
else:
|
|
try:
|
|
_omdb_obj = self.config.OMDb.get_omdb(imdb_id)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
except Exception:
|
|
logger.error(f"IMDb ID: {imdb_id}")
|
|
raise
|
|
if not _omdb_obj:
|
|
raise Failed
|
|
return _omdb_obj
|
|
|
|
_tvdb_obj = None
|
|
def tvdb_obj():
|
|
nonlocal _tvdb_obj
|
|
if _tvdb_obj is None:
|
|
_tvdb_obj = False
|
|
if tvdb_id:
|
|
try:
|
|
_tvdb_obj = self.config.TVDb.get_tvdb_obj(tvdb_id, is_movie=self.library.is_movie)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
else:
|
|
logger.info(f"No TVDb ID for Guid: {item.guid}")
|
|
if not _tvdb_obj:
|
|
raise Failed
|
|
return _tvdb_obj
|
|
|
|
_mdb_obj = None
|
|
def mdb_obj():
|
|
nonlocal _mdb_obj
|
|
if _mdb_obj is None:
|
|
_mdb_obj = False
|
|
if self.config.MDBList.limit is False:
|
|
if self.library.is_show and tvdb_id:
|
|
try:
|
|
_mdb_obj = self.config.MDBList.get_series(tvdb_id)
|
|
except LimitReached as err:
|
|
logger.debug(err)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
except Exception:
|
|
logger.trace(f"TVDb ID: {tvdb_id}")
|
|
raise
|
|
if self.library.is_movie and tmdb_id:
|
|
try:
|
|
_mdb_obj = self.config.MDBList.get_movie(tmdb_id)
|
|
except LimitReached as err:
|
|
logger.debug(err)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
except Exception:
|
|
logger.trace(f"TMDb ID: {tmdb_id}")
|
|
raise
|
|
if imdb_id and not _mdb_obj:
|
|
try:
|
|
_mdb_obj = self.config.MDBList.get_imdb(imdb_id)
|
|
except LimitReached as err:
|
|
logger.debug(err)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
except Exception:
|
|
logger.trace(f"IMDb ID: {imdb_id}")
|
|
raise
|
|
if not _mdb_obj:
|
|
logger.warning(f"No MdbItem for {item.title} (Guid: {item.guid})")
|
|
if not _mdb_obj:
|
|
raise Failed
|
|
return _mdb_obj
|
|
|
|
anidb_id = None
|
|
def get_anidb_id():
|
|
temp_id = self.config.Convert.ids_to_anidb(self.library, item.ratingKey, tvdb_id, imdb_id, tmdb_id)
|
|
return temp_id if temp_id else False
|
|
|
|
_anidb_obj = None
|
|
def anidb_obj():
|
|
nonlocal anidb_id, _anidb_obj
|
|
if _anidb_obj is None:
|
|
_anidb_obj = False
|
|
if anidb_id is None:
|
|
anidb_id = get_anidb_id()
|
|
if anidb_id:
|
|
try:
|
|
_anidb_obj = self.config.AniDB.get_anime(anidb_id)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
else:
|
|
logger.warning(f"No AniDB ID for Guid: {item.guid}")
|
|
if not _anidb_obj:
|
|
raise Failed
|
|
return _anidb_obj
|
|
|
|
_mal_obj = None
|
|
def mal_obj():
|
|
nonlocal anidb_id, _mal_obj
|
|
if _mal_obj is None:
|
|
_mal_obj = False
|
|
if anidb_id is None:
|
|
anidb_id = get_anidb_id()
|
|
mal_id = None
|
|
if item.ratingKey in self.library.reverse_mal:
|
|
mal_id = self.library.reverse_mal[item.ratingKey]
|
|
elif not anidb_id:
|
|
logger.warning(f"Convert Warning: No AniDB ID to Convert to MyAnimeList ID for Guid: {item.guid}")
|
|
else:
|
|
try:
|
|
mal_id = self.config.Convert.anidb_to_mal(anidb_id)
|
|
except Failed as err:
|
|
logger.warning(f"{err} of Guid: {item.guid}")
|
|
if mal_id:
|
|
try:
|
|
_mal_obj = self.config.MyAnimeList.get_anime(mal_id)
|
|
except Failed as err:
|
|
logger.error(str(err))
|
|
if not _mal_obj:
|
|
raise Failed
|
|
return _mal_obj
|
|
|
|
for attribute, item_attr in [
|
|
(self.library.mass_audience_rating_update, "audienceRating"),
|
|
(self.library.mass_critic_rating_update, "rating"),
|
|
(self.library.mass_user_rating_update, "userRating")
|
|
]:
|
|
if attribute:
|
|
current = getattr(item, item_attr)
|
|
for option in attribute:
|
|
if option in ["lock", "remove"]:
|
|
if option == "remove" and current:
|
|
if item_attr not in remove_edits:
|
|
remove_edits[item_attr] = []
|
|
remove_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nRemove {name_display[item_attr]} (Batched)"
|
|
elif item_attr not in locked_fields:
|
|
if item_attr not in lock_edits:
|
|
lock_edits[item_attr] = []
|
|
lock_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nLock {name_display[item_attr]} (Batched)"
|
|
break
|
|
elif option in ["unlock", "reset"]:
|
|
if option == "reset" and current:
|
|
if item_attr not in reset_edits:
|
|
reset_edits[item_attr] = []
|
|
reset_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nReset {name_display[item_attr]} (Batched)"
|
|
elif item_attr in locked_fields:
|
|
if item_attr not in unlock_edits:
|
|
unlock_edits[item_attr] = []
|
|
unlock_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nUnlock {name_display[item_attr]} (Batched)"
|
|
break
|
|
else:
|
|
try:
|
|
if option == "tmdb":
|
|
found_rating = tmdb_obj().vote_average # noqa
|
|
elif option == "imdb":
|
|
found_rating = self.config.IMDb.get_rating(imdb_id)
|
|
elif option == "omdb":
|
|
found_rating = omdb_obj().imdb_rating # noqa
|
|
elif option == "trakt_user":
|
|
_ratings = trakt_ratings()
|
|
_id = tmdb_id if self.library.is_movie else tvdb_id
|
|
if _id in _ratings:
|
|
found_rating = _ratings[_id]
|
|
else:
|
|
raise Failed
|
|
elif str(option).startswith("mdb"):
|
|
mdb_item = mdb_obj()
|
|
if option == "mdb_average":
|
|
found_rating = mdb_item.average / 10 if mdb_item.average else None # noqa
|
|
elif option == "mdb_imdb":
|
|
found_rating = mdb_item.imdb_rating if mdb_item.imdb_rating else None # noqa
|
|
elif option == "mdb_metacritic":
|
|
found_rating = mdb_item.metacritic_rating / 10 if mdb_item.metacritic_rating else None # noqa
|
|
elif option == "mdb_metacriticuser":
|
|
found_rating = mdb_item.metacriticuser_rating if mdb_item.metacriticuser_rating else None # noqa
|
|
elif option == "mdb_trakt":
|
|
found_rating = mdb_item.trakt_rating / 10 if mdb_item.trakt_rating else None # noqa
|
|
elif option == "mdb_tomatoes":
|
|
found_rating = mdb_item.tomatoes_rating / 10 if mdb_item.tomatoes_rating else None # noqa
|
|
elif option == "mdb_tomatoesaudience":
|
|
found_rating = mdb_item.tomatoesaudience_rating / 10 if mdb_item.tomatoesaudience_rating else None # noqa
|
|
elif option == "mdb_tmdb":
|
|
found_rating = mdb_item.tmdb_rating / 10 if mdb_item.tmdb_rating else None # noqa
|
|
elif option == "mdb_letterboxd":
|
|
found_rating = mdb_item.letterboxd_rating * 2 if mdb_item.letterboxd_rating else None # noqa
|
|
elif option == "mdb_myanimelist":
|
|
found_rating = mdb_item.myanimelist_rating if mdb_item.myanimelist_rating else None # noqa
|
|
else:
|
|
found_rating = mdb_item.score / 10 if mdb_item.score else None # noqa
|
|
elif option == "anidb_rating":
|
|
found_rating = anidb_obj().rating # noqa
|
|
elif option == "anidb_average":
|
|
found_rating = anidb_obj().average # noqa
|
|
elif option == "anidb_score":
|
|
found_rating = anidb_obj().score # noqa
|
|
elif option == "mal":
|
|
found_rating = mal_obj().score # noqa
|
|
else:
|
|
found_rating = option
|
|
if not found_rating:
|
|
logger.info(f"No {option} {name_display[item_attr]} Found")
|
|
raise Failed
|
|
found_rating = f"{float(found_rating):.1f}"
|
|
if str(current) != found_rating:
|
|
if found_rating not in rating_edits[item_attr]:
|
|
rating_edits[item_attr][found_rating] = []
|
|
rating_edits[item_attr][found_rating].append(item.ratingKey)
|
|
item_edits += f"\nUpdate {name_display[item_attr]} (Batched) | {found_rating}"
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
if self.library.mass_genre_update or self.library.genre_mapper:
|
|
new_genres = []
|
|
extra_option = None
|
|
if self.library.mass_genre_update:
|
|
for option in self.library.mass_genre_update:
|
|
if option in ["lock", "unlock", "remove", "reset"]:
|
|
extra_option = option
|
|
break
|
|
try:
|
|
if option == "tmdb":
|
|
new_genres = tmdb_obj().genres # noqa
|
|
elif option == "imdb":
|
|
new_genres = self.config.IMDb.get_genres(imdb_id)
|
|
elif option == "omdb":
|
|
new_genres = omdb_obj().genres # noqa
|
|
elif option == "tvdb":
|
|
new_genres = tvdb_obj().genres # noqa
|
|
elif str(option) in anidb.weights:
|
|
new_genres = [str(t).title() for t, w in anidb_obj().tags.items() if w >= anidb.weights[str(option)]] # noqa
|
|
elif option == "mal":
|
|
new_genres = mal_obj().genres # noqa
|
|
else:
|
|
new_genres = option
|
|
if not new_genres:
|
|
logger.info(f"No {option} Genres Found")
|
|
raise Failed
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
item_genres = [g.tag for g in item.genres]
|
|
if not new_genres and extra_option not in ["remove", "reset"]:
|
|
new_genres = item_genres
|
|
if self.library.genre_mapper:
|
|
mapped_genres = []
|
|
for genre in new_genres:
|
|
if genre in self.library.genre_mapper:
|
|
if self.library.genre_mapper[genre]:
|
|
mapped_genres.append(self.library.genre_mapper[genre])
|
|
else:
|
|
mapped_genres.append(genre)
|
|
new_genres = mapped_genres
|
|
_add = list(set(new_genres) - set(item_genres))
|
|
_remove = list(set(item_genres) - set(new_genres))
|
|
for genre_list, edit_type in [(_add, "add"), (_remove, "remove")]:
|
|
if genre_list:
|
|
for g in genre_list:
|
|
if g not in genre_edits[edit_type]:
|
|
genre_edits[edit_type][g] = []
|
|
genre_edits[edit_type][g].append(item.ratingKey)
|
|
item_edits += f"\n{edit_type.capitalize()} Genres (Batched) | {', '.join(genre_list)}"
|
|
if extra_option in ["unlock", "reset"] and ("genre" in locked_fields or _add or _remove):
|
|
if "genre" not in unlock_edits:
|
|
unlock_edits["genre"] = []
|
|
unlock_edits["genre"].append(item.ratingKey)
|
|
item_edits += "\nUnlock Genre (Batched)"
|
|
elif extra_option in ["lock", "remove"] and "genre" not in locked_fields and not _add and not _remove:
|
|
if "genre" not in lock_edits:
|
|
lock_edits["genre"] = []
|
|
lock_edits["genre"].append(item.ratingKey)
|
|
item_edits += "\nLock Genre (Batched)"
|
|
|
|
if self.library.mass_content_rating_update or self.library.content_rating_mapper:
|
|
new_rating = None
|
|
extra_option = None
|
|
if self.library.mass_content_rating_update:
|
|
for option in self.library.mass_content_rating_update:
|
|
if option in ["lock", "unlock", "remove", "reset"]:
|
|
extra_option = option
|
|
break
|
|
try:
|
|
if option == "omdb":
|
|
new_rating = omdb_obj().content_rating # noqa
|
|
elif option == "mdb":
|
|
_rating = mdb_obj().content_rating # noqa
|
|
new_rating = _rating if _rating else None
|
|
elif str(option).startswith("mdb_commonsense"):
|
|
_rating = mdb_obj().commonsense # noqa
|
|
if not _rating:
|
|
new_rating = None
|
|
elif option == "mdb_commonsense0":
|
|
new_rating = str(_rating).rjust(2, "0")
|
|
else:
|
|
new_rating = _rating
|
|
elif str(option).startswith("mdb_age_rating"):
|
|
_rating = mdb_obj().age_rating # noqa
|
|
if not _rating:
|
|
new_rating = None
|
|
elif option == "mdb_age_rating0":
|
|
new_rating = str(_rating).rjust(2, "0")
|
|
else:
|
|
new_rating = _rating
|
|
elif option == "mal":
|
|
new_rating = mal_obj().rating # noqa
|
|
else:
|
|
new_rating = option
|
|
if new_rating is None:
|
|
logger.info(f"No {option} Content Rating Found")
|
|
raise Failed
|
|
else:
|
|
new_rating = str(new_rating)
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
is_none = False
|
|
do_lock = False
|
|
do_unlock = False
|
|
current_rating = item.contentRating
|
|
if not new_rating:
|
|
new_rating = current_rating
|
|
if self.library.content_rating_mapper:
|
|
if new_rating in self.library.content_rating_mapper:
|
|
new_rating = self.library.content_rating_mapper[new_rating]
|
|
if not new_rating:
|
|
is_none = True
|
|
if extra_option == "reset":
|
|
if current_rating:
|
|
if "contentRating" not in reset_edits:
|
|
reset_edits["contentRating"] = []
|
|
reset_edits["contentRating"].append(item.ratingKey)
|
|
item_edits += "\nReset Content Rating (Batched)"
|
|
elif "contentRating" in locked_fields:
|
|
do_unlock = True
|
|
elif extra_option == "remove" or is_none:
|
|
if current_rating:
|
|
if "contentRating" not in remove_edits:
|
|
remove_edits["contentRating"] = []
|
|
remove_edits["contentRating"].append(item.ratingKey)
|
|
item_edits += "\nRemove Content Rating (Batched)"
|
|
elif "contentRating" not in locked_fields:
|
|
do_lock = True
|
|
elif new_rating and new_rating != current_rating:
|
|
if new_rating not in content_edits:
|
|
content_edits[new_rating] = []
|
|
content_edits[new_rating].append(item.ratingKey)
|
|
item_edits += f"\nUpdate Content Rating (Batched) | {new_rating}"
|
|
do_lock = False
|
|
|
|
if extra_option == "lock" or do_lock:
|
|
if "contentRating" not in lock_edits:
|
|
lock_edits["contentRating"] = []
|
|
lock_edits["contentRating"].append(item.ratingKey)
|
|
item_edits += "\nLock Content Rating (Batched)"
|
|
elif extra_option == "unlock" or do_unlock:
|
|
if "contentRating" not in unlock_edits:
|
|
unlock_edits["contentRating"] = []
|
|
unlock_edits["contentRating"].append(item.ratingKey)
|
|
item_edits += "\nUnlock Content Rating (Batched)"
|
|
|
|
if self.library.mass_original_title_update:
|
|
current_original = item.originalTitle
|
|
for option in self.library.mass_original_title_update:
|
|
if option in ["lock", "remove"]:
|
|
if option == "remove" and current_original:
|
|
if "originalTitle" not in remove_edits:
|
|
remove_edits["originalTitle"] = []
|
|
remove_edits["originalTitle"].append(item.ratingKey)
|
|
item_edits += "\nRemove Original Title (Batched)"
|
|
elif "originalTitle" not in locked_fields:
|
|
if "originalTitle" not in lock_edits:
|
|
lock_edits["originalTitle"] = []
|
|
lock_edits["originalTitle"].append(item.ratingKey)
|
|
item_edits += "\nLock Original Title (Batched)"
|
|
break
|
|
elif option in ["unlock", "reset"]:
|
|
if option == "reset" and current_original:
|
|
if "originalTitle" not in reset_edits:
|
|
reset_edits["originalTitle"] = []
|
|
reset_edits["originalTitle"].append(item.ratingKey)
|
|
item_edits += "\nReset Original Title (Batched)"
|
|
elif "originalTitle" in locked_fields:
|
|
if "originalTitle" not in unlock_edits:
|
|
unlock_edits["originalTitle"] = []
|
|
unlock_edits["originalTitle"].append(item.ratingKey)
|
|
item_edits += "\nUnlock Original Title (Batched)"
|
|
break
|
|
else:
|
|
try:
|
|
if option == "anidb":
|
|
new_original_title = anidb_obj().main_title # noqa
|
|
elif option == "anidb_official":
|
|
new_original_title = anidb_obj().official_title # noqa
|
|
elif option == "mal":
|
|
new_original_title = mal_obj().title # noqa
|
|
elif option == "mal_english":
|
|
new_original_title = mal_obj().title_english # noqa
|
|
elif option == "mal_japanese":
|
|
new_original_title = mal_obj().title_japanese # noqa
|
|
else:
|
|
new_original_title = option
|
|
if not new_original_title:
|
|
logger.info(f"No {option} Original Title Found")
|
|
raise Failed
|
|
if str(current_original) != str(new_original_title):
|
|
item.editOriginalTitle(new_original_title)
|
|
item_edits += f"\nUpdated Original Title | {new_original_title}"
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
if self.library.mass_studio_update:
|
|
current_studio = item.studio
|
|
for option in self.library.mass_studio_update:
|
|
if option in ["lock", "remove"]:
|
|
if option == "remove" and current_studio:
|
|
if "studio" not in remove_edits:
|
|
remove_edits["studio"] = []
|
|
remove_edits["studio"].append(item.ratingKey)
|
|
item_edits += "\nRemove Studio (Batched)"
|
|
elif "studio" not in locked_fields:
|
|
if "studio" not in lock_edits:
|
|
lock_edits["studio"] = []
|
|
lock_edits["studio"].append(item.ratingKey)
|
|
item_edits += "\nLock Studio (Batched)"
|
|
break
|
|
elif option in ["unlock", "reset"]:
|
|
if option == "reset" and current_studio:
|
|
if "studio" not in reset_edits:
|
|
reset_edits["studio"] = []
|
|
reset_edits["studio"].append(item.ratingKey)
|
|
item_edits += "\nReset Studio (Batched)"
|
|
elif "studio" in locked_fields:
|
|
if "studio" not in unlock_edits:
|
|
unlock_edits["studio"] = []
|
|
unlock_edits["studio"].append(item.ratingKey)
|
|
item_edits += "\nUnlock Studio (Batched)"
|
|
break
|
|
else:
|
|
try:
|
|
if option == "tmdb":
|
|
new_studio = tmdb_obj().studio # noqa
|
|
elif option == "anidb":
|
|
new_studio = anidb_obj().studio # noqa
|
|
elif option == "mal":
|
|
new_studio = mal_obj().studio # noqa
|
|
else:
|
|
new_studio = option
|
|
if not new_studio:
|
|
logger.info(f"No {option} Studio Found")
|
|
raise Failed
|
|
if str(current_studio) != str(new_studio):
|
|
if new_studio not in studio_edits:
|
|
studio_edits[new_studio] = []
|
|
studio_edits[new_studio].append(item.ratingKey)
|
|
item_edits += f"\nUpdate Studio (Batched) | {new_studio}"
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
for attribute, item_attr in [
|
|
(self.library.mass_originally_available_update, "originallyAvailableAt"),
|
|
(self.library.mass_added_at_update, "addedAt")
|
|
]:
|
|
if attribute:
|
|
current = getattr(item, item_attr)
|
|
if current:
|
|
current = current.strftime("%Y-%m-%d")
|
|
for option in attribute:
|
|
if option in ["lock", "remove"]:
|
|
if option == "remove" and current:
|
|
if item_attr not in remove_edits:
|
|
remove_edits[item_attr] = []
|
|
remove_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nRemove {name_display[item_attr]} (Batched)"
|
|
elif item_attr not in locked_fields:
|
|
if item_attr not in lock_edits:
|
|
lock_edits[item_attr] = []
|
|
lock_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nLock {name_display[item_attr]} (Batched)"
|
|
break
|
|
elif option in ["unlock", "reset"]:
|
|
if option == "reset" and current:
|
|
if item_attr not in reset_edits:
|
|
reset_edits[item_attr] = []
|
|
reset_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nReset {name_display[item_attr]} (Batched)"
|
|
elif item_attr in locked_fields:
|
|
if item_attr not in unlock_edits:
|
|
unlock_edits[item_attr] = []
|
|
unlock_edits[item_attr].append(item.ratingKey)
|
|
item_edits += f"\nUnlock {name_display[item_attr]} (Batched)"
|
|
break
|
|
else:
|
|
try:
|
|
if option == "tmdb":
|
|
new_date = tmdb_obj().release_date if self.library.is_movie else tmdb_obj().first_air_date # noqa
|
|
elif option == "omdb":
|
|
new_date = omdb_obj().released # noqa
|
|
elif option == "tvdb":
|
|
new_date = tvdb_obj().release_date # noqa
|
|
elif option == "mdb":
|
|
new_date = mdb_obj().released # noqa
|
|
elif option == "mdb_digital":
|
|
new_date = mdb_obj().released_digital # noqa
|
|
elif option == "anidb":
|
|
new_date = anidb_obj().released # noqa
|
|
elif option == "mal":
|
|
new_date = mal_obj().aired # noqa
|
|
else:
|
|
new_date = option
|
|
if not new_date:
|
|
logger.info(f"No {option} {name_display[item_attr]} Found")
|
|
raise Failed
|
|
new_date = new_date.strftime("%Y-%m-%d")
|
|
if current != new_date:
|
|
if new_date not in date_edits[item_attr]:
|
|
date_edits[item_attr][new_date] = []
|
|
date_edits[item_attr][new_date].append(item.ratingKey)
|
|
item_edits += f"\nUpdate {name_display[item_attr]} (Batched) | {new_date}"
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
if len(item_edits) > 0:
|
|
logger.info(f"Item Edits{item_edits}")
|
|
else:
|
|
logger.info("No Item Edits")
|
|
|
|
if self.library.mass_poster_update or self.library.mass_background_update:
|
|
try:
|
|
new_poster, new_background, item_dir, name = self.library.find_item_assets(item)
|
|
except Failed:
|
|
new_poster, new_background, item_dir, name = None, None, None, None
|
|
try:
|
|
tmdb_item = tmdb_obj()
|
|
except Failed:
|
|
tmdb_item = None
|
|
|
|
if self.library.mass_poster_update:
|
|
source = self.library.mass_poster_update["source"]
|
|
ignore_locked = self.library.mass_poster_update["ignore_locked"]
|
|
ignore_overlays = self.library.mass_poster_update.get("ignore_overlays")
|
|
thumb_locked = any(f.name == "thumb" and f.locked for f in item.fields)
|
|
labels = [la.tag for la in self.library.item_labels(item)]
|
|
has_overlay_label = "Overlay" in labels
|
|
|
|
# Bypass ignore_locked and ignore_overlays checks if the source is "unlock" or "lock"
|
|
if source in ["unlock", "lock"]:
|
|
self.library.poster_update(item, new_poster, tmdb=tmdb_item.poster_url if tmdb_item else None, title=item.title) # noqa
|
|
elif ignore_locked and thumb_locked:
|
|
# Skip processing if ignore_locked is True and thumb is locked
|
|
pass
|
|
elif ignore_overlays and has_overlay_label:
|
|
# Skip processing if ignore_overlays is True and Overlay label is found
|
|
pass
|
|
else:
|
|
self.library.poster_update(item, new_poster, tmdb=tmdb_item.poster_url if tmdb_item else None, title=item.title) # noqa
|
|
|
|
if self.library.mass_background_update:
|
|
source = self.library.mass_background_update["source"]
|
|
ignore_locked = self.library.mass_background_update["ignore_locked"]
|
|
ignore_overlays = self.library.mass_poster_update["ignore_overlays"]
|
|
art_locked = any(f.name == "art" and f.locked for f in item.fields)
|
|
|
|
if source in ["unlock", "lock"]:
|
|
self.library.background_update(item, new_background, tmdb=tmdb_item.backdrop_url if tmdb_item else None, title=item.title) # noqa
|
|
|
|
elif not (ignore_locked and art_locked):
|
|
self.library.background_update(item, new_background, tmdb=tmdb_item.backdrop_url if tmdb_item else None, title=item.title) # noqa
|
|
|
|
|
|
if self.library.is_show and (
|
|
(self.library.mass_poster_update and
|
|
(self.library.mass_poster_update["seasons"] or self.library.mass_poster_update["episodes"])) or
|
|
(self.library.mass_background_update and
|
|
(self.library.mass_background_update["seasons"] or self.library.mass_background_update["episodes"]))
|
|
):
|
|
real_show = None
|
|
try:
|
|
real_show = tmdb_item.load_show() if tmdb_item else None # noqa
|
|
except Failed as e:
|
|
logger.error(e)
|
|
tmdb_seasons = {s.season_number: s for s in real_show.seasons} if real_show else {}
|
|
for season in self.library.query(item.seasons):
|
|
if (self.library.mass_poster_update and self.library.mass_poster_update["seasons"]) or \
|
|
(self.library.mass_background_update and self.library.mass_background_update["seasons"]):
|
|
try:
|
|
season_poster, season_background, _, _ = self.library.find_item_assets(season, item_asset_directory=item_dir, folder_name=name)
|
|
except Failed:
|
|
season_poster = None
|
|
season_background = None
|
|
season_title = f"S{season.seasonNumber} {season.title}"
|
|
tmdb_poster = tmdb_seasons[season.seasonNumber].poster_url if season.seasonNumber in tmdb_seasons else None
|
|
if self.library.mass_poster_update and self.library.mass_poster_update["seasons"]:
|
|
self.library.poster_update(season, season_poster, tmdb=tmdb_poster, title=season_title if season else None)
|
|
if self.library.mass_background_update and self.library.mass_background_update["seasons"]:
|
|
self.library.background_update(season, season_background, title=season_title if season else None)
|
|
|
|
if (self.library.mass_poster_update and self.library.mass_poster_update["episodes"]) or \
|
|
(self.library.mass_background_update and self.library.mass_background_update["episodes"]):
|
|
tmdb_episodes = {}
|
|
if season.seasonNumber in tmdb_seasons:
|
|
for episode in tmdb_seasons[season.seasonNumber].episodes:
|
|
episode._partial = False
|
|
try:
|
|
tmdb_episodes[episode.episode_number] = episode
|
|
except NotFound:
|
|
logger.error(f"TMDb Error: An Episode of Season {season.seasonNumber} was Not Found")
|
|
|
|
for episode in self.library.query(season.episodes):
|
|
try:
|
|
episode = self.library.reload(episode)
|
|
except Failed:
|
|
logger.error(f"S{season.seasonNumber}E{episode.episodeNumber} {episode.title} Failed to Reload from Plex")
|
|
continue
|
|
try:
|
|
episode_poster, episode_background, _, _ = self.library.find_item_assets(episode, item_asset_directory=item_dir, folder_name=name)
|
|
except Failed:
|
|
episode_poster = None
|
|
episode_background = None
|
|
episode_title = f"S{season.seasonNumber}E{episode.episodeNumber} {episode.title}"
|
|
tmdb_poster = tmdb_episodes[episode.episodeNumber].still_url if episode.episodeNumber in tmdb_episodes else None
|
|
if self.library.mass_poster_update and self.library.mass_poster_update["episodes"]:
|
|
self.library.poster_update(episode, episode_poster, tmdb=tmdb_poster, title=episode_title if episode else None)
|
|
if self.library.mass_background_update and self.library.mass_background_update["episodes"]:
|
|
self.library.background_update(episode, episode_background, title=episode_title if episode else None)
|
|
|
|
episode_ops = [
|
|
(self.library.mass_episode_audience_rating_update, "audienceRating"),
|
|
(self.library.mass_episode_critic_rating_update, "rating"),
|
|
(self.library.mass_episode_user_rating_update, "userRating")
|
|
]
|
|
|
|
if any([x is not None for x, _ in episode_ops]):
|
|
|
|
if any(["imdb" in x for x, _ in episode_ops if x]) and not imdb_id:
|
|
logger.info(f"No IMDb ID for Guid: {item.guid}")
|
|
|
|
for ep in item.episodes():
|
|
ep = self.library.reload(ep)
|
|
item_title = self.library.get_item_sort_title(ep, atr="title")
|
|
logger.info("")
|
|
logger.info(f"Processing {item_title}")
|
|
item_edits = ""
|
|
|
|
for attribute, item_attr in episode_ops:
|
|
if attribute:
|
|
current = getattr(ep, item_attr)
|
|
for option in attribute:
|
|
if option in ["lock", "remove"]:
|
|
if option == "remove" and current:
|
|
if item_attr not in ep_remove_edits:
|
|
ep_remove_edits[item_attr] = []
|
|
ep_remove_edits[item_attr].append(ep)
|
|
item_edits += f"\nRemove {name_display[item_attr]} (Batched)"
|
|
elif item_attr not in locked_fields:
|
|
if item_attr not in ep_lock_edits:
|
|
ep_lock_edits[item_attr] = []
|
|
ep_lock_edits[item_attr].append(ep)
|
|
item_edits += f"\nLock {name_display[item_attr]} (Batched)"
|
|
break
|
|
elif option in ["unlock", "reset"]:
|
|
if option == "reset" and current:
|
|
if item_attr not in ep_reset_edits:
|
|
ep_reset_edits[item_attr] = []
|
|
ep_reset_edits[item_attr].append(ep)
|
|
item_edits += f"\nReset {name_display[item_attr]} (Batched)"
|
|
elif item_attr in locked_fields:
|
|
if item_attr not in ep_unlock_edits:
|
|
ep_unlock_edits[item_attr] = []
|
|
ep_unlock_edits[item_attr].append(ep)
|
|
item_edits += f"\nUnlock {name_display[item_attr]} (Batched)"
|
|
break
|
|
else:
|
|
try:
|
|
try:
|
|
tmdb_item = tmdb_obj()
|
|
except Failed:
|
|
tmdb_item = None
|
|
found_rating = None
|
|
if tmdb_item and option == "tmdb":
|
|
try:
|
|
found_rating = self.config.TMDb.get_episode(tmdb_item.tmdb_id, ep.seasonNumber, ep.episodeNumber).vote_average # noqa
|
|
except Failed as er:
|
|
logger.error(er)
|
|
elif imdb_id and option == "imdb":
|
|
found_rating = self.config.IMDb.get_episode_rating(imdb_id, ep.seasonNumber, ep.episodeNumber)
|
|
else:
|
|
try:
|
|
found_rating = float(option)
|
|
except ValueError:
|
|
pass
|
|
if not found_rating:
|
|
logger.info(f"No {option} {name_display[item_attr]} Found")
|
|
raise Failed
|
|
found_rating = f"{float(found_rating):.1f}"
|
|
if str(current) != found_rating:
|
|
if found_rating not in ep_rating_edits[item_attr]:
|
|
ep_rating_edits[item_attr][found_rating] = []
|
|
ep_rating_edits[item_attr][found_rating].append(ep)
|
|
item_edits += f"\nUpdate {name_display[item_attr]} (Batched) | {found_rating}"
|
|
break
|
|
except Failed:
|
|
continue
|
|
|
|
if len(item_edits) > 0:
|
|
logger.info(f"Item Edits:{item_edits}")
|
|
|
|
logger.info("")
|
|
logger.separator("Batch Updates", space=False, border=False)
|
|
logger.info("")
|
|
|
|
def get_batch_info(placement, total, display_attr, total_count, display_value=None, is_episode=False, out_type=None, tag_type=None):
|
|
return f"Batch {name_display[display_attr] if display_attr in name_display else display_attr.capitalize()} Update ({placement}/{total}): " \
|
|
f"{f'{out_type.capitalize()} ' if out_type else ''}" \
|
|
f"{f'Adding {display_value} to ' if tag_type == 'add' else f'Removing {display_value} from ' if tag_type == 'remove' else ''}" \
|
|
f"{total_count} {'Episode' if is_episode else 'Movie' if self.library.is_movie else 'Show'}" \
|
|
f"{'s' if total_count > 1 else ''}{'' if out_type or tag_type else f' updated to {display_value}'}"
|
|
|
|
for tag_attribute, edit_dict in [("Label", label_edits), ("Genre", genre_edits)]:
|
|
for edit_type, batch_edits in edit_dict.items():
|
|
_size = len(batch_edits.items())
|
|
for i, (tag_name, rating_keys) in enumerate(sorted(batch_edits.items()), 1):
|
|
logger.info(get_batch_info(i, _size, tag_attribute, len(rating_keys), display_value=tag_name, tag_type=edit_type))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
getattr(self.library.Plex, f"{edit_type}{tag_attribute}")(tag_name)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
for item_attr, _edits in rating_edits.items():
|
|
_size = len(rating_edits.items())
|
|
for i, (new_rating, rating_keys) in enumerate(sorted(_edits.items()), 1):
|
|
logger.info(get_batch_info(i, _size, item_attr, len(rating_keys), display_value=new_rating))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex.editField(item_attr, new_rating)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(content_edits.items())
|
|
for i, (new_rating, rating_keys) in enumerate(sorted(content_edits.items()), 1):
|
|
logger.info(get_batch_info(i, _size, "contentRating", len(rating_keys), display_value=new_rating))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex.editContentRating(new_rating)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(studio_edits.items())
|
|
for i, (new_studio, rating_keys) in enumerate(sorted(studio_edits.items()), 1):
|
|
logger.info(get_batch_info(i, _size, "studio", len(rating_keys), display_value=new_studio))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex.editStudio(new_studio)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(date_edits["originallyAvailableAt"].items())
|
|
for i, (new_date, rating_keys) in enumerate(sorted(date_edits["originallyAvailableAt"].items()), 1):
|
|
logger.info(get_batch_info(i, _size, "originallyAvailableAt", len(rating_keys), display_value=new_date))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex.editOriginallyAvailable(new_date)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
epoch = datetime(1970, 1, 1)
|
|
_size = len(date_edits["addedAt"].items())
|
|
for i, (new_date, rating_keys) in enumerate(sorted(date_edits["addedAt"].items()), 1):
|
|
logger.info(get_batch_info(i, _size, "addedAt", len(rating_keys), display_value=new_date))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
new_date = datetime.strptime(new_date, "%Y-%m-%d")
|
|
logger.trace(new_date)
|
|
try:
|
|
ts = int(round(new_date.timestamp()))
|
|
except (TypeError, OSError):
|
|
offset = int(datetime(2000, 1, 1, tzinfo=timezone.utc).timestamp() - datetime(2000, 1, 1).timestamp())
|
|
ts = int((new_date - epoch).total_seconds()) - offset
|
|
logger.trace(epoch + timedelta(seconds=ts))
|
|
self.library.Plex.editAddedAt(ts)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(remove_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(remove_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), out_type="remov"))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex.editField(field_attr, None, locked=True)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(reset_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(reset_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), out_type="reset"))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex.editField(field_attr, None, locked=False)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(lock_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(lock_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), out_type="lock"))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex._edit(**{f"{field_attr}.locked": 1})
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(unlock_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(unlock_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), out_type="unlock"))
|
|
self.library.Plex.batchMultiEdits(self.library.load_list_from_cache(rating_keys))
|
|
self.library.Plex._edit(**{f"{field_attr}.locked": 0})
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
for item_attr, _edits in ep_rating_edits.items():
|
|
_size = len(_edits.items())
|
|
for i, (new_rating, rating_keys) in enumerate(sorted(_edits.items()), 1):
|
|
logger.info(get_batch_info(i, _size, item_attr, len(rating_keys), display_value=new_rating, is_episode=True))
|
|
self.library.Plex.batchMultiEdits(rating_keys)
|
|
self.library.Plex.editField(item_attr, new_rating)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(ep_remove_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(ep_remove_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), is_episode=True, out_type="remov"))
|
|
self.library.Plex.batchMultiEdits(rating_keys)
|
|
self.library.Plex.editField(field_attr, None, locked=True)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(ep_reset_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(ep_reset_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), is_episode=True, out_type="reset"))
|
|
self.library.Plex.batchMultiEdits(rating_keys)
|
|
self.library.Plex.editField(field_attr, None, locked=False)
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(ep_lock_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(ep_lock_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), is_episode=True, out_type="lock"))
|
|
self.library.Plex.batchMultiEdits(rating_keys)
|
|
self.library.Plex._edit(**{f"{field_attr}.locked": 1})
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
_size = len(ep_unlock_edits.items())
|
|
for i, (field_attr, rating_keys) in enumerate(ep_unlock_edits.items(), 1):
|
|
logger.info(get_batch_info(i, _size, field_attr, len(rating_keys), is_episode=True, out_type="unlock"))
|
|
self.library.Plex.batchMultiEdits(rating_keys)
|
|
self.library.Plex._edit(**{f"{field_attr}.locked": 0})
|
|
self.library.Plex.saveMultiEdits()
|
|
|
|
if self.library.Radarr and self.library.radarr_add_all_existing:
|
|
logger.info("")
|
|
logger.separator(f"Radarr Add All Existing: {len(radarr_adds)} Movies", space=False, border=False)
|
|
logger.info("")
|
|
try:
|
|
self.library.Radarr.add_tmdb(radarr_adds)
|
|
except Failed as e:
|
|
logger.error(e)
|
|
|
|
if self.library.Sonarr and self.library.sonarr_add_all_existing:
|
|
logger.info("")
|
|
logger.separator(f"Sonarr Add All Existing: {len(sonarr_adds)} Shows", space=False, border=False)
|
|
logger.info("")
|
|
try:
|
|
self.library.Sonarr.add_tvdb(sonarr_adds)
|
|
except Failed as e:
|
|
logger.error(e)
|
|
|
|
if self.library.radarr_remove_by_tag:
|
|
logger.info("")
|
|
logger.separator(f"Radarr Remove {len(self.library.radarr_remove_by_tag)} Movies with Tags: {', '.join(self.library.radarr_remove_by_tag)}", space=False, border=False)
|
|
logger.info("")
|
|
self.library.Radarr.remove_all_with_tags(self.library.radarr_remove_by_tag)
|
|
if self.library.sonarr_remove_by_tag:
|
|
logger.info("")
|
|
logger.separator(f"Sonarr Remove {len(self.library.sonarr_remove_by_tag)} Shows with Tags: {', '.join(self.library.sonarr_remove_by_tag)}", space=False, border=False)
|
|
logger.info("")
|
|
self.library.Sonarr.remove_all_with_tags(self.library.sonarr_remove_by_tag)
|
|
|
|
if self.library.delete_collections or self.library.show_unmanaged or self.library.show_unconfigured or self.library.assets_for_all or self.library.mass_collection_mode:
|
|
logger.info("")
|
|
logger.separator("Collection Operations", space=False, border=False)
|
|
logger.info("")
|
|
|
|
if self.library.delete_collections:
|
|
logger.info("")
|
|
logger.separator("Deleting Collections", space=False, border=False)
|
|
logger.info("")
|
|
|
|
less = self.library.delete_collections["less"] if self.library.delete_collections and self.library.delete_collections["less"] is not None else None
|
|
managed = self.library.delete_collections["managed"] if self.library.delete_collections else None
|
|
configured = self.library.delete_collections["configured"] if self.library.delete_collections else None
|
|
ignore_smart = self.library.delete_collections["ignore_empty_smart_collections"] if self.library.delete_collections else True
|
|
unmanaged_collections = []
|
|
unconfigured_collections = []
|
|
all_collections = self.library.get_all_collections()
|
|
for i, col in enumerate(all_collections, 1):
|
|
logger.ghost(f"Reading Collection: {i}/{len(all_collections)} {col.title}")
|
|
col = self.library.reload(col, force=True)
|
|
labels = [la.tag for la in self.library.item_labels(col)]
|
|
|
|
if should_be_deleted(col, labels, configured, managed, less, ignore_smart):
|
|
try:
|
|
self.library.delete(col)
|
|
logger.info(f"{col.title} Deleted")
|
|
except Failed as e:
|
|
logger.error(e)
|
|
else:
|
|
if "PMM" not in labels and "Kometa" not in labels:
|
|
unmanaged_collections.append(col)
|
|
if col.title not in self.library.collection_names:
|
|
unconfigured_collections.append(col)
|
|
|
|
if self.library.show_unmanaged and len(unmanaged_collections) > 0:
|
|
logger.info("")
|
|
logger.separator(f"Unmanaged Collections in {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
for col in unmanaged_collections:
|
|
logger.info(col.title)
|
|
logger.info("")
|
|
logger.info(f"{len(unmanaged_collections)} Unmanaged Collection{'s' if len(unmanaged_collections) > 1 else ''}")
|
|
elif self.library.show_unmanaged:
|
|
logger.info("")
|
|
logger.separator(f"No Unmanaged Collections in {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
|
|
if self.library.show_unconfigured and len(unconfigured_collections) > 0:
|
|
logger.info("")
|
|
logger.separator(f"Unconfigured Collections in {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
for col in unconfigured_collections:
|
|
logger.info(col.title)
|
|
logger.info("")
|
|
logger.info(f"{len(unconfigured_collections)} Unconfigured Collection{'s' if len(unconfigured_collections) > 1 else ''}")
|
|
elif self.library.show_unconfigured:
|
|
logger.info("")
|
|
logger.separator(f"No Unconfigured Collections in {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
|
|
if self.library.assets_for_all_collections and len(unconfigured_collections) > 0:
|
|
logger.info("")
|
|
logger.separator(f"Unconfigured Collection Assets Check for {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
for col in unconfigured_collections:
|
|
try:
|
|
poster, background, item_dir, name = self.library.find_item_assets(col)
|
|
if poster or background:
|
|
self.library.upload_images(col, poster=poster, background=background)
|
|
elif self.library.show_missing_assets:
|
|
logger.warning(f"Asset Warning: No poster or background found in an assets folder for '{name}'")
|
|
except Failed as e:
|
|
logger.warning(e)
|
|
|
|
if self.library.mass_collection_mode:
|
|
logger.info("")
|
|
logger.separator(f"Unconfigured Mass Collection Mode to {self.library.mass_collection_mode} for {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
for col in unconfigured_collections:
|
|
if int(col.collectionMode) not in plex.collection_mode_keys \
|
|
or plex.collection_mode_keys[int(col.collectionMode)] != self.library.mass_collection_mode:
|
|
self.library.collection_mode_query(col, self.library.mass_collection_mode)
|
|
logger.info(f"{col.title} Collection Mode Updated")
|
|
|
|
if self.library.metadata_backup:
|
|
logger.info("")
|
|
logger.separator(f"Metadata Backup for {self.library.name} Library", space=False, border=False)
|
|
logger.info("")
|
|
logger.info(f"Metadata Backup Path: {self.library.metadata_backup['path']}")
|
|
logger.info("")
|
|
yaml = None
|
|
if os.path.exists(self.library.metadata_backup["path"]):
|
|
try:
|
|
yaml = self.config.Requests.file_yaml(self.library.metadata_backup["path"])
|
|
except Failed as e:
|
|
logger.error(e)
|
|
filename, file_extension = os.path.splitext(self.library.metadata_backup["path"])
|
|
i = 1
|
|
while os.path.exists(f"{filename}{i}{file_extension}"):
|
|
i += 1
|
|
os.rename(self.library.metadata_backup["path"], f"{filename}{i}{file_extension}")
|
|
logger.error(f"Backup failed to load saving copy to {filename}{i}{file_extension}")
|
|
if not yaml:
|
|
yaml = self.config.Requests.file_yaml(self.library.metadata_backup["path"], create=True)
|
|
if "metadata" not in yaml.data or not isinstance(yaml.data["metadata"], dict):
|
|
yaml.data["metadata"] = {}
|
|
special_names = {}
|
|
for mk, mv in yaml.data["metadata"].items():
|
|
if mv and "title" in mv:
|
|
special_names[mv["title"]] = mk
|
|
if "year" in mv:
|
|
special_names[f"{mv['title']} ({mv['year']})"] = mk
|
|
items = self.library.get_all(load=True)
|
|
titles = []
|
|
year_titles = []
|
|
for item in items:
|
|
titles.append(item.title)
|
|
if isinstance(item, (Movie, Show)):
|
|
year_titles.append(f"{item.title} ({item.year})")
|
|
for i, item in enumerate(items, 1):
|
|
logger.ghost(f"Processing: {i}/{len(items)} {item.title}")
|
|
map_key, attrs = self.library.get_locked_attributes(item, titles, year_titles)
|
|
if map_key in special_names:
|
|
map_key = special_names[map_key]
|
|
og_dict = yaml.data["metadata"][map_key] if map_key in yaml.data["metadata"] and yaml.data["metadata"][map_key] and isinstance(yaml.data["metadata"][map_key], dict) else {}
|
|
if attrs or (self.library.metadata_backup["add_blank_entries"] and not og_dict):
|
|
|
|
def loop_dict(looping, dest_dict):
|
|
if not looping:
|
|
return None
|
|
for lk, lv in looping.items():
|
|
if isinstance(lv, dict) and lk in dest_dict and dest_dict[lk] and isinstance(dest_dict[lk], dict):
|
|
dest_dict[lk] = loop_dict(lv, dest_dict[lk])
|
|
else:
|
|
dest_dict[lk] = lv
|
|
return dest_dict
|
|
|
|
yaml.data["metadata"][map_key] = loop_dict(attrs, og_dict)
|
|
logger.exorcise()
|
|
yaml.save()
|
|
logger.info(f"{len(yaml.data['metadata'])} {self.library.type}{'s' if len(yaml.data['metadata']) > 1 else ''} Backed Up")
|
|
|
|
operation_run_time = str(datetime.now() - operation_start).split('.')[0]
|
|
logger.info("")
|
|
logger.separator(f"Finished {self.library.name} Library Operations\nOperations Run Time: {operation_run_time}")
|
|
return operation_run_time
|