You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
434 lines
27 KiB
434 lines
27 KiB
import logging, os, re, requests
|
|
from datetime import datetime, timedelta
|
|
from modules import util
|
|
from modules.util import Failed
|
|
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
|
|
from plexapi.library import MovieSection, ShowSection
|
|
from plexapi.collection import Collections
|
|
from plexapi.server import PlexServer
|
|
from plexapi.video import Movie, Show
|
|
from retrying import retry
|
|
from ruamel import yaml
|
|
|
|
logger = logging.getLogger("Plex Meta Manager")
|
|
|
|
class PlexAPI:
|
|
def __init__(self, params, TMDb, TVDb):
|
|
try: self.PlexServer = PlexServer(params["plex"]["url"], params["plex"]["token"], timeout=params["plex"]["timeout"])
|
|
except Unauthorized: raise Failed("Plex Error: Plex token is invalid")
|
|
except ValueError as e: raise Failed(f"Plex Error: {e}")
|
|
except requests.exceptions.ConnectionError:
|
|
util.print_stacktrace()
|
|
raise Failed("Plex Error: Plex url is invalid")
|
|
self.is_movie = params["library_type"] == "movie"
|
|
self.is_show = params["library_type"] == "show"
|
|
self.Plex = next((s for s in self.PlexServer.library.sections() if s.title == params["name"] and ((self.is_movie and isinstance(s, MovieSection)) or (self.is_show and isinstance(s, ShowSection)))), None)
|
|
if not self.Plex: raise Failed(f"Plex Error: Plex Library {params['name']} not found")
|
|
try: self.data, ind, bsi = yaml.util.load_yaml_guess_indent(open(params["metadata_path"], encoding="utf-8"))
|
|
except yaml.scanner.ScannerError as e: raise Failed(f"YAML Error: {util.tab_new_lines(e)}")
|
|
|
|
def get_dict(attribute):
|
|
if attribute in self.data:
|
|
if self.data[attribute]:
|
|
if isinstance(self.data[attribute], dict): return self.data[attribute]
|
|
else: logger.warning(f"Config Warning: {attribute} must be a dictionary")
|
|
else: logger.warning(f"Config Warning: {attribute} attribute is blank")
|
|
return None
|
|
|
|
self.metadata = get_dict("metadata")
|
|
self.templates = get_dict("templates")
|
|
self.collections = get_dict("collections")
|
|
|
|
if self.metadata is None and self.collections is None:
|
|
raise Failed("YAML Error: metadata attributes or collections attribute required")
|
|
|
|
if params["asset_directory"]:
|
|
for ad in params["asset_directory"]:
|
|
logger.info(f"Using Asset Directory: {ad}")
|
|
|
|
self.TMDb = TMDb
|
|
self.TVDb = TVDb
|
|
self.Radarr = None
|
|
self.Sonarr = None
|
|
self.Tautulli = None
|
|
self.name = params["name"]
|
|
self.missing_path = os.path.join(os.path.dirname(os.path.abspath(params["metadata_path"])), f"{os.path.splitext(os.path.basename(params['metadata_path']))[0]}_missing.yml")
|
|
self.metadata_path = params["metadata_path"]
|
|
self.asset_directory = params["asset_directory"]
|
|
self.sync_mode = params["sync_mode"]
|
|
self.show_unmanaged = params["show_unmanaged"]
|
|
self.show_filtered = params["show_filtered"]
|
|
self.show_missing = params["show_missing"]
|
|
self.save_missing = params["save_missing"]
|
|
self.mass_genre_update = params["mass_genre_update"]
|
|
self.plex = params["plex"]
|
|
self.timeout = params["plex"]["timeout"]
|
|
self.missing = {}
|
|
self.run_again = []
|
|
|
|
@retry(stop_max_attempt_number=6, wait_fixed=10000)
|
|
def search(self, title, libtype=None, year=None):
|
|
if libtype is not None and year is not None: return self.Plex.search(title=title, year=year, libtype=libtype)
|
|
elif libtype is not None: return self.Plex.search(title=title, libtype=libtype)
|
|
elif year is not None: return self.Plex.search(title=title, year=year)
|
|
else: return self.Plex.search(title=title)
|
|
|
|
@retry(stop_max_attempt_number=6, wait_fixed=10000)
|
|
def fetchItem(self, data):
|
|
return self.PlexServer.fetchItem(data)
|
|
|
|
@retry(stop_max_attempt_number=6, wait_fixed=10000)
|
|
def server_search(self, data):
|
|
return self.PlexServer.search(data)
|
|
|
|
def get_all_collections(self):
|
|
return self.Plex.search(libtype="collection")
|
|
|
|
def get_collection(self, data):
|
|
collection = util.choose_from_list(self.search(str(data), libtype="collection"), "collection", str(data), exact=True)
|
|
if collection: return collection
|
|
else: raise Failed(f"Plex Error: Collection {data} not found")
|
|
|
|
def validate_collections(self, collections):
|
|
valid_collections = []
|
|
for collection in collections:
|
|
try: valid_collections.append(self.get_collection(collection))
|
|
except Failed as e: logger.error(e)
|
|
if len(valid_collections) == 0:
|
|
raise Failed(f"Collection Error: No valid Plex Collections in {collections}")
|
|
return valid_collections
|
|
|
|
def add_missing(self, collection, items, is_movie):
|
|
col_name = collection.encode("ascii", "replace").decode()
|
|
if col_name not in self.missing:
|
|
self.missing[col_name] = {}
|
|
section = "Movies Missing (TMDb IDs)" if is_movie else "Shows Missing (TVDb IDs)"
|
|
if section not in self.missing[col_name]:
|
|
self.missing[col_name][section] = {}
|
|
for title, item_id in items:
|
|
self.missing[col_name][section][int(item_id)] = str(title).encode("ascii", "replace").decode()
|
|
with open(self.missing_path, "w"): pass
|
|
try:
|
|
yaml.round_trip_dump(self.missing, open(self.missing_path, "w"))
|
|
except yaml.scanner.ScannerError as e:
|
|
logger.error(f"YAML Error: {util.tab_new_lines(e)}")
|
|
|
|
def add_to_collection(self, collection, items, filters, show_filtered, rating_key_map, movie_map, show_map):
|
|
name = collection.title if isinstance(collection, Collections) else collection
|
|
collection_items = collection.items() if isinstance(collection, Collections) else []
|
|
total = len(items)
|
|
max_length = len(str(total))
|
|
length = 0
|
|
for i, item in enumerate(items, 1):
|
|
try:
|
|
current = self.fetchItem(item.ratingKey if isinstance(item, (Movie, Show)) else int(item))
|
|
except (BadRequest, NotFound):
|
|
logger.error(f"Plex Error: Item {item} not found")
|
|
continue
|
|
match = True
|
|
if filters:
|
|
length = util.print_return(length, f"Filtering {(' ' * (max_length - len(str(i)))) + str(i)}/{total} {current.title}")
|
|
for filter_method, filter_data in filters:
|
|
modifier = filter_method[-4:]
|
|
method = util.filter_alias[filter_method[:-4]] if modifier in [".not", ".lte", ".gte"] else util.filter_alias[filter_method]
|
|
if method == "max_age":
|
|
threshold_date = datetime.now() - timedelta(days=filter_data)
|
|
if current.originallyAvailableAt is None or current.originallyAvailableAt < threshold_date:
|
|
match = False
|
|
break
|
|
elif method == "original_language":
|
|
movie = None
|
|
for key, value in movie_map.items():
|
|
if current.ratingKey == value:
|
|
try:
|
|
movie = self.TMDb.get_movie(key)
|
|
break
|
|
except Failed:
|
|
pass
|
|
if movie is None:
|
|
logger.warning(f"Filter Error: No TMDb ID found for {current.title}")
|
|
continue
|
|
if (modifier == ".not" and movie.original_language in filter_data) or (modifier != ".not" and movie.original_language not in filter_data):
|
|
match = False
|
|
break
|
|
elif method == "audio_track_title":
|
|
for media in current.media:
|
|
for part in media.parts:
|
|
for audio in part.audioStreams():
|
|
for check_title in filter_data:
|
|
if (modifier == ".not" and check_title.lower() in audio.title.lower()) or (modifier != ".not" and check_title.lower() not in audio.title.lower()):
|
|
match = False
|
|
break
|
|
if match is False: break
|
|
if match is False: break
|
|
if match is False: break
|
|
if match is False: break
|
|
elif modifier in [".gte", ".lte"]:
|
|
if method == "vote_count":
|
|
tmdb_item = None
|
|
for key, value in movie_map.items():
|
|
if current.ratingKey == value:
|
|
try:
|
|
tmdb_item = self.TMDb.get_movie(key) if self.is_movie else self.TMDb.get_show(key)
|
|
break
|
|
except Failed:
|
|
pass
|
|
if tmdb_item is None:
|
|
logger.warning(f"Filter Error: No TMDb ID found for {current.title}")
|
|
continue
|
|
attr = tmdb_item.vote_count
|
|
else:
|
|
attr = getattr(current, method) / 60000 if method == "duration" else getattr(current, method)
|
|
if (modifier == ".lte" and attr > filter_data) or (modifier == ".gte" and attr < filter_data):
|
|
match = False
|
|
break
|
|
else:
|
|
attrs = []
|
|
if method in ["video_resolution", "audio_language", "subtitle_language"]:
|
|
for media in current.media:
|
|
if method == "video_resolution": attrs.extend([media.videoResolution])
|
|
for part in media.parts:
|
|
if method == "audio_language": attrs.extend([a.language for a in part.audioStreams()])
|
|
if method == "subtitle_language": attrs.extend([s.language for s in part.subtitleStreams()])
|
|
elif method in ["contentRating", "studio", "year", "rating", "originallyAvailableAt"]: attrs = [str(getattr(current, method))]
|
|
elif method in ["actors", "countries", "directors", "genres", "writers", "collections"]: attrs = [getattr(x, "tag") for x in getattr(current, method)]
|
|
|
|
if (not list(set(filter_data) & set(attrs)) and modifier != ".not") or (list(set(filter_data) & set(attrs)) and modifier == ".not"):
|
|
match = False
|
|
break
|
|
length = util.print_return(length, f"Filtering {(' ' * (max_length - len(str(i)))) + str(i)}/{total} {current.title}")
|
|
if match:
|
|
util.print_end(length, f"{name} Collection | {'=' if current in collection_items else '+'} | {current.title}")
|
|
if current in collection_items: rating_key_map[current.ratingKey] = None
|
|
else: current.addCollection(name)
|
|
elif show_filtered is True:
|
|
logger.info(f"{name} Collection | X | {current.title}")
|
|
media_type = f"{'Movie' if self.is_movie else 'Show'}{'s' if total > 1 else ''}"
|
|
util.print_end(length, f"{total} {media_type} Processed")
|
|
return rating_key_map
|
|
|
|
def search_item(self, data, year=None):
|
|
return util.choose_from_list(self.search(data, year=year), "movie" if self.is_movie else "show", str(data), exact=True)
|
|
|
|
def update_metadata(self, TMDb, test):
|
|
logger.info("")
|
|
util.separator(f"{self.name} Library Metadata")
|
|
logger.info("")
|
|
if not self.metadata:
|
|
raise Failed("No metadata to edit")
|
|
for m in self.metadata:
|
|
if test and ("test" not in self.metadata[m] or self.metadata[m]["test"] is not True):
|
|
continue
|
|
logger.info("")
|
|
util.separator()
|
|
logger.info("")
|
|
year = None
|
|
if "year" in self.metadata[m]:
|
|
year = util.check_number(self.metadata[m]["year"], "year", minimum=1800, maximum=datetime.now().year + 1)
|
|
|
|
title = m
|
|
if "title" in self.metadata[m]:
|
|
if self.metadata[m]["title"] is None: logger.error("Metadata Error: title attribute is blank")
|
|
else: title = self.metadata[m]["title"]
|
|
|
|
item = self.search_item(title, year=year)
|
|
|
|
if item is None:
|
|
item = self.search_item(f"{title} (SUB)", year=year)
|
|
|
|
if item is None and "alt_title" in self.metadata[m]:
|
|
if self.metadata[m]["alt_title"] is None:
|
|
logger.error("Metadata Error: alt_title attribute is blank")
|
|
else:
|
|
alt_title = self.metadata[m]["alt_title"]
|
|
item = self.search_item(alt_title, year=year)
|
|
|
|
if item is None:
|
|
logger.error(f"Plex Error: Item {m} not found")
|
|
logger.error(f"Skipping {m}")
|
|
continue
|
|
|
|
item_type = "Movie" if self.is_movie else "Show"
|
|
logger.info(f"Updating {item_type}: {title}...")
|
|
|
|
tmdb_item = None
|
|
try:
|
|
if "tmdb_id" in self.metadata[m]:
|
|
if self.metadata[m]["tmdb_id"] is None: logger.error("Metadata Error: tmdb_id attribute is blank")
|
|
elif self.is_show: logger.error("Metadata Error: tmdb_id attribute only works with movie libraries")
|
|
else: tmdb_item = TMDb.get_show(util.regex_first_int(self.metadata[m]["tmdb_id"], "Show"))
|
|
except Failed as e:
|
|
logger.error(e)
|
|
|
|
originally_available = tmdb_item.first_air_date if tmdb_item else None
|
|
rating = tmdb_item.vote_average if tmdb_item else None
|
|
original_title = tmdb_item.original_name if tmdb_item and tmdb_item.original_name != tmdb_item.name else None
|
|
studio = tmdb_item.networks[0].name if tmdb_item else None
|
|
tagline = tmdb_item.tagline if tmdb_item and len(tmdb_item.tagline) > 0 else None
|
|
summary = tmdb_item.overview if tmdb_item else None
|
|
|
|
edits = {}
|
|
def add_edit(name, current, group, key=None, value=None):
|
|
if value or name in group:
|
|
if value or group[name]:
|
|
if key is None: key = name
|
|
if value is None: value = group[name]
|
|
if str(current) != str(value):
|
|
edits[f"{key}.value"] = value
|
|
edits[f"{key}.locked"] = 1
|
|
logger.info(f"Detail: {name} updated to {value}")
|
|
else:
|
|
logger.error(f"Metadata Error: {name} attribute is blank")
|
|
add_edit("title", item.title, self.metadata[m], value=title)
|
|
add_edit("sort_title", item.titleSort, self.metadata[m], key="titleSort")
|
|
add_edit("originally_available", str(item.originallyAvailableAt)[:-9], self.metadata[m], key="originallyAvailableAt", value=originally_available)
|
|
add_edit("rating", item.rating, self.metadata[m], value=rating)
|
|
add_edit("content_rating", item.contentRating, self.metadata[m], key="contentRating")
|
|
add_edit("original_title", item.originalTitle, self.metadata[m], key="originalTitle", value=original_title)
|
|
add_edit("studio", item.studio, self.metadata[m], value=studio)
|
|
item_tagline = item.tagline if self.is_movie else item._data.attrib.get("tagline")
|
|
add_edit("tagline", item_tagline, self.metadata[m], value=tagline)
|
|
add_edit("summary", item.summary, self.metadata[m], value=summary)
|
|
if len(edits) > 0:
|
|
logger.debug(f"Details Update: {edits}")
|
|
try:
|
|
item.edit(**edits)
|
|
item.reload()
|
|
logger.info(f"{item_type}: {m} Details Update Successful")
|
|
except BadRequest:
|
|
util.print_stacktrace()
|
|
logger.error(f"{item_type}: {m} Details Update Failed")
|
|
else:
|
|
logger.info(f"{item_type}: {m} Details Update Not Needed")
|
|
|
|
genres = []
|
|
|
|
if tmdb_item:
|
|
genres.extend([genre.name for genre in tmdb_item.genres])
|
|
|
|
if "genre" in self.metadata[m]:
|
|
if self.metadata[m]["genre"]: genres.extend(util.get_list(self.metadata[m]["genre"]))
|
|
else: logger.error("Metadata Error: genre attribute is blank")
|
|
|
|
if len(genres) > 0:
|
|
item_genres = [genre.tag for genre in item.genres]
|
|
if "genre_sync_mode" in self.metadata[m]:
|
|
if self.metadata[m]["genre_sync_mode"] is None: logger.error("Metadata Error: genre_sync_mode attribute is blank defaulting to append")
|
|
elif self.metadata[m]["genre_sync_mode"] not in ["append", "sync"]: logger.error("Metadata Error: genre_sync_mode attribute must be either 'append' or 'sync' defaulting to append")
|
|
elif self.metadata[m]["genre_sync_mode"] == "sync":
|
|
for genre in (g for g in item_genres if g not in genres):
|
|
item.removeGenre(genre)
|
|
logger.info(f"Detail: Genre {genre} removed")
|
|
for genre in (g for g in genres if g not in item_genres):
|
|
item.addGenre(genre)
|
|
logger.info(f"Detail: Genre {genre} added")
|
|
|
|
if "label" in self.metadata[m]:
|
|
if self.metadata[m]["label"]:
|
|
item_labels = [label.tag for label in item.labels]
|
|
labels = util.get_list(self.metadata[m]["label"])
|
|
if "label_sync_mode" in self.metadata[m]:
|
|
if self.metadata[m]["label_sync_mode"] is None: logger.error("Metadata Error: label_sync_mode attribute is blank defaulting to append")
|
|
elif self.metadata[m]["label_sync_mode"] not in ["append", "sync"]: logger.error("Metadata Error: label_sync_mode attribute must be either 'append' or 'sync' defaulting to append")
|
|
elif self.metadata[m]["label_sync_mode"] == "sync":
|
|
for label in (la for la in item_labels if la not in labels):
|
|
item.removeLabel(label)
|
|
logger.info(f"Detail: Label {label} removed")
|
|
for label in (la for la in labels if la not in item_labels):
|
|
item.addLabel(label)
|
|
logger.info(f"Detail: Label {label} added")
|
|
else:
|
|
logger.error("Metadata Error: label attribute is blank")
|
|
|
|
if "seasons" in self.metadata[m] and self.is_show:
|
|
if self.metadata[m]["seasons"]:
|
|
for season_id in self.metadata[m]["seasons"]:
|
|
logger.info("")
|
|
logger.info(f"Updating season {season_id} of {m}...")
|
|
if isinstance(season_id, int):
|
|
try: season = item.season(season_id)
|
|
except NotFound: logger.error(f"Metadata Error: Season: {season_id} not found")
|
|
else:
|
|
|
|
if "title" in self.metadata[m]["seasons"][season_id] and self.metadata[m]["seasons"][season_id]["title"]:
|
|
title = self.metadata[m]["seasons"][season_id]["title"]
|
|
else:
|
|
title = season.title
|
|
if "sub" in self.metadata[m]["seasons"][season_id]:
|
|
if self.metadata[m]["seasons"][season_id]["sub"] is None:
|
|
logger.error("Metadata Error: sub attribute is blank")
|
|
elif self.metadata[m]["seasons"][season_id]["sub"] is True and "(SUB)" not in title:
|
|
title = f"{title} (SUB)"
|
|
elif self.metadata[m]["seasons"][season_id]["sub"] is False and title.endswith(" (SUB)"):
|
|
title = title[:-6]
|
|
else:
|
|
logger.error("Metadata Error: sub attribute must be True or False")
|
|
|
|
edits = {}
|
|
add_edit("title", season.title, self.metadata[m]["seasons"][season_id], value=title)
|
|
add_edit("summary", season.summary, self.metadata[m]["seasons"][season_id])
|
|
if len(edits) > 0:
|
|
logger.debug(f"Season: {season_id} Details Update: {edits}")
|
|
try:
|
|
season.edit(**edits)
|
|
season.reload()
|
|
logger.info(f"Season: {season_id} Details Update Successful")
|
|
except BadRequest:
|
|
util.print_stacktrace()
|
|
logger.error(f"Season: {season_id} Details Update Failed")
|
|
else:
|
|
logger.info(f"Season: {season_id} Details Update Not Needed")
|
|
else:
|
|
logger.error(f"Metadata Error: Season: {season_id} invalid, it must be an integer")
|
|
else:
|
|
logger.error("Metadata Error: seasons attribute is blank")
|
|
|
|
if "episodes" in self.metadata[m] and self.is_show:
|
|
if self.metadata[m]["episodes"]:
|
|
for episode_str in self.metadata[m]["episodes"]:
|
|
logger.info("")
|
|
match = re.search("[Ss]\\d+[Ee]\\d+", episode_str)
|
|
if match:
|
|
output = match.group(0)[1:].split("E" if "E" in m.group(0) else "e")
|
|
episode_id = int(output[0])
|
|
season_id = int(output[1])
|
|
logger.info(f"Updating episode S{episode_id}E{season_id} of {m}...")
|
|
try: episode = item.episode(season=season_id, episode=episode_id)
|
|
except NotFound: logger.error(f"Metadata Error: episode {episode_id} of season {season_id} not found")
|
|
else:
|
|
if "title" in self.metadata[m]["episodes"][episode_str] and self.metadata[m]["episodes"][episode_str]["title"]:
|
|
title = self.metadata[m]["episodes"][episode_str]["title"]
|
|
else:
|
|
title = episode.title
|
|
if "sub" in self.metadata[m]["episodes"][episode_str]:
|
|
if self.metadata[m]["episodes"][episode_str]["sub"] is None:
|
|
logger.error("Metadata Error: sub attribute is blank")
|
|
elif self.metadata[m]["episodes"][episode_str]["sub"] is True and "(SUB)" not in title:
|
|
title = f"{title} (SUB)"
|
|
elif self.metadata[m]["episodes"][episode_str]["sub"] is False and title.endswith(" (SUB)"):
|
|
title = title[:-6]
|
|
else:
|
|
logger.error("Metadata Error: sub attribute must be True or False")
|
|
edits = {}
|
|
add_edit("title", episode.title, self.metadata[m]["episodes"][episode_str], value=title)
|
|
add_edit("sort_title", episode.titleSort, self.metadata[m]["episodes"][episode_str], key="titleSort")
|
|
add_edit("rating", episode.rating, self.metadata[m]["episodes"][episode_str])
|
|
add_edit("originally_available", str(episode.originallyAvailableAt)[:-9], self.metadata[m]["episodes"][episode_str], key="originallyAvailableAt")
|
|
add_edit("summary", episode.summary, self.metadata[m]["episodes"][episode_str])
|
|
if len(edits) > 0:
|
|
logger.debug(f"Season: {season_id} Episode: {episode_id} Details Update: {edits}")
|
|
try:
|
|
episode.edit(**edits)
|
|
episode.reload()
|
|
logger.info(
|
|
f"Season: {season_id} Episode: {episode_id} Details Update Successful")
|
|
except BadRequest:
|
|
util.print_stacktrace()
|
|
logger.error(f"Season: {season_id} Episode: {episode_id} Details Update Failed")
|
|
else:
|
|
logger.info(f"Season: {season_id} Episode: {episode_id} Details Update Not Needed")
|
|
else:
|
|
logger.error(f"Metadata Error: episode {episode_str} invalid must have S##E## format")
|
|
else:
|
|
logger.error("Metadata Error: episodes attribute is blank")
|