import logging, os, requests from lxml import html from modules import util from modules.radarr import RadarrAPI from modules.sonarr import SonarrAPI from modules.tautulli import TautulliAPI from modules.util import Failed from plexapi.exceptions import BadRequest, NotFound, Unauthorized from plexapi.library import Collections, MovieSection, ShowSection from plexapi.server import PlexServer from import Movie, Show from retrying import retry from ruamel import yaml logger = logging.getLogger("Plex Meta Manager") class PlexAPI: def __init__(self, params): try: self.PlexServer = PlexServer(params["plex"]["url"], params["plex"]["token"], timeout=600) except Unauthorized: raise Failed("Plex Error: Plex token is invalid") except ValueError as e: raise Failed("Plex Error: {}".format(e)) except requests.exceptions.ConnectionError as e: util.print_stacktrace() raise Failed("Plex Error: Plex url is invalid") self.is_movie = params["library_type"] == "movie" self.is_show = params["library_type"] == "show" self.Plex = next((s for s in self.PlexServer.library.sections() if s.title == params["name"] and ((self.is_movie and isinstance(s, MovieSection)) or (self.is_show and isinstance(s, ShowSection)))), None) if not self.Plex: raise Failed("Plex Error: Plex Library {} not found".format(params["name"])) try:, ind, bsi = yaml.util.load_yaml_guess_indent(open(params["metadata_path"], encoding="utf-8")) except yaml.scanner.ScannerError as e: raise Failed("YAML Error: {}".format(str(e).replace("\n", "\n|\t "))) self.metadata = None if "metadata" in if["metadata"]: self.metadata =["metadata"] else: logger.warning("Config Warning: metadata attribute is blank") else: logger.warning("Config Warning: metadata attribute not found") self.collections = None if "collections" in if["collections"]: self.collections =["collections"] else: logger.warning("Config Warning: collections attribute is blank") else: logger.warning("Config Warning: collections attribute not found") if self.metadata is None and self.collections is None: raise Failed("YAML Error: metadata attributes or collections attribute required") if params["asset_directory"]:"Using Asset Directory: {}".format(params["asset_directory"])) self.Radarr = None if params["tmdb"] and params["radarr"]:"Connecting to {} library's Radarr...".format(params["name"])) try: self.Radarr = RadarrAPI(params["tmdb"], params["radarr"]) except Failed as e: logger.error(e)"{} library's Radarr Connection {}".format(params["name"], "Failed" if self.Radarr is None else "Successful")) self.Sonarr = None if params["tvdb"] and params["sonarr"]:"Connecting to {} library's Sonarr...".format(params["name"])) try: self.Sonarr = SonarrAPI(params["tvdb"], params["sonarr"], self.Plex.language) except Failed as e: logger.error(e)"{} library's Sonarr Connection {}".format(params["name"], "Failed" if self.Sonarr is None else "Successful")) self.Tautulli = None if params["tautulli"]:"Connecting to {} library's Tautulli...".format(params["name"])) try: self.Tautulli = TautulliAPI(params["tautulli"]) except Failed as e: logger.error(e)"{} library's Tautulli Connection {}".format(params["name"], "Failed" if self.Tautulli is None else "Successful")) = params["name"] self.missing_path = os.path.join(os.path.dirname(os.path.abspath(params["metadata_path"])), "{}_missing.yml".format(os.path.splitext(os.path.basename(params["metadata_path"]))[0])) self.metadata_path = params["metadata_path"] self.asset_directory = params["asset_directory"] self.sync_mode = params["sync_mode"] self.plex = params["plex"] self.radarr = params["radarr"] self.sonarr = params["sonarr"] self.tautulli = params["tautulli"] @retry(stop_max_attempt_number=6, wait_fixed=10000) def search(self, title, libtype=None, year=None): if libtype is not None and year is not None: return, year=year, libtype=libtype) elif libtype is not None: return, libtype=libtype) elif year is not None: return, year=year) else: return @retry(stop_max_attempt_number=6, wait_fixed=10000) def fetchItem(self, data): return self.PlexServer.fetchItem(data) @retry(stop_max_attempt_number=6, wait_fixed=10000) def server_search(self, data): return def get_all_collections(self): return"collection") def get_collection(self, data): collection = util.choose_from_list(, libtype="collection"), "collection", str(data), exact=True) if collection: return collection else: raise Failed("Plex Error: Collection {} not found".format(data)) def get_item(self, data, year=None): if isinstance(data, (int, Movie, Show)): try: return self.fetchItem(data.ratingKey if isinstance(data, (Movie, Show)) else data) except BadRequest: raise Failed("Plex Error: Item {} not found".format(data)) else: item_list = if year is None else, year=year) item = util.choose_from_list(item_list, "movie" if self.is_movie else "show", data) if item: return item else: raise Failed("Plex Error: Item {} not found".format(data)) def validate_collections(self, collections): valid_collections = [] for collection in collections: try: valid_collections.append(self.get_collection(collection)) except Failed as e: logger.error(e) if len(valid_collections) == 0: raise Failed("Collection Error: No valid Plex Collections in {}".format(collections[c][m])) return valid_collections def get_actor_rating_key(self, data): movie_rating_key = None for result in self.server_search(data): entry = str(result).split(":") entry[0] = entry[0][1:] if entry[0] == "Movie": movie_rating_key = int(entry[1]) break if movie_rating_key: for role in self.fetchItem(movie_rating_key).roles: role = str(role).split(":") if data.upper().replace(" ", "-") == role[2][:-1].upper(): return int(role[1]) raise Failed("Plex Error: Actor: {} not found".format(data)) def del_collection_if_empty(self, collection): missing_data = {} if not os.path.exists(self.missing_path): with open(self.missing_path, "w"): pass try: missing_data, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.missing_path)) if not missing_data: missing_data = {} if collection in missing_data and len(missing_data[collection]) == 0: del missing_data[collection] yaml.round_trip_dump(missing_data, open(self.missing_path, "w"), indent=ind, block_seq_indent=bsi) except yaml.scanner.ScannerError as e: logger.error("YAML Error: {}".format(str(e).replace("\n", "\n|\t "))) def clear_collection_missing(self, collection): missing_data = {} if not os.path.exists(self.missing_path): with open(self.missing_path, "w"): pass try: missing_data, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.missing_path)) if not missing_data: missing_data = {} if collection in missing_data: missing_data[collection.encode("ascii", "replace").decode()] = {} yaml.round_trip_dump(missing_data, open(self.missing_path, "w"), indent=ind, block_seq_indent=bsi) except yaml.scanner.ScannerError as e: logger.error("YAML Error: {}".format(str(e).replace("\n", "\n|\t "))) def save_missing(self, collection, items, is_movie): missing_data = {} if not os.path.exists(self.missing_path): with open(self.missing_path, "w"): pass try: missing_data, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.missing_path)) if not missing_data: missing_data = {} col_name = collection.encode("ascii", "replace").decode() if col_name not in missing_data: missing_data[col_name] = {} section = "Movies Missing (TMDb IDs)" if is_movie else "Shows Missing (TVDb IDs)" if section not in missing_data[col_name]: missing_data[col_name][section] = {} for title, item_id in items: missing_data[col_name][section][int(item_id)] = str(title).encode("ascii", "replace").decode() yaml.round_trip_dump(missing_data, open(self.missing_path, "w"), indent=ind, block_seq_indent=bsi) except yaml.scanner.ScannerError as e: logger.error("YAML Error: {}".format(str(e).replace("\n", "\n|\t "))) def add_to_collection(self, collection, items, filters, map={}): name = collection.title if isinstance(collection, Collections) else collection collection_items = collection.items() if isinstance(collection, Collections) else [] total = len(items) max_length = len(str(total)) length = 0 for i, item in enumerate(items, 1): current = self.get_item(item) match = True if filters: length = util.print_return(length, "Filtering {}/{} {}".format((" " * (max_length - len(str(i)))) + str(i), total, current.title)) for f in filters: modifier = f[0][-4:] method = util.filter_alias[f[0][:-4]] if modifier in [".not", ".lte", ".gte"] else util.filter_alias[f[0]] if method == "max_age": threshold_date = - timedelta(days=f[1]) attr = getattr(current, "originallyAvailableAt") if attr is None or attr < threshold_date: match = False break elif modifier in [".gte", ".lte"]: if method == "originallyAvailableAt": threshold_date = datetime.strptime(f[1], "%m/%d/%y") attr = getattr(current, "originallyAvailableAt") if (modifier == ".lte" and attr > threshold_date) or (modifier == ".gte" and attr < threshold_date): match = False break elif method in ["year", "rating"]: attr = getattr(current, method) if (modifier == ".lte" and attr > f[1]) or (modifier == ".gte" and attr < f[1]): match = False break else: terms = f[1] if isinstance(f[1], list) else str(f[1]).split(", ") if method in ["video_resolution", "audio_language", "subtitle_language"]: for media in if method == "video_resolution": attrs = [media.videoResolution] for part in if method == "audio_language": attrs = ([a.language for a in part.audioStreams()]) if method == "subtitle_language": attrs = ([s.language for s in part.subtitleStreams()]) elif method in ["contentRating", "studio", "year", "rating", "originallyAvailableAt"]: attrs = [str(getattr(current, method))] elif method in ["actors", "countries", "directors", "genres", "writers", "collections"]: attrs = [getattr(x, "tag") for x in getattr(current, method)] if (not list(set(terms) & set(attrs)) and modifier != ".not") or (list(set(terms) & set(attrs)) and modifier == ".not"): match = False break length = util.print_return(length, "Filtering {}/{} {}".format((" " * (max_length - len(str(i)))) + str(i), total, current.title)) if match: util.print_end(length, "{} Collection | {} | {}".format(name, "=" if current in collection_items else "+", current.title)) if current in collection_items: map[current.ratingKey] = None else: current.addCollection(name) media_type = "{}{}".format("Movie" if self.is_movie else "Show", "s" if total > 1 else "") util.print_end(length, "{} {} Processed".format(total, media_type)) return map def update_metadata(self):"") util.seperator("{} Library Metadata".format("") if not self.metadata: raise Failed("No metadata to edit") for m in self.metadata:"") util.seperator()"") year = None if "year" in self.metadata[m]: now = if self.metadata[m]["year"] is None: logger.error("Metadata Error: year attribute is blank") elif not isinstance(self.metadata[m]["year"], int): logger.error("Metadata Error: year attribute must be an integer") elif self.metadata[m]["year"] not in range(1800, now.year + 2): logger.error("Metadata Error: year attribute must be between 1800-{}".format(now.year + 1)) else: year = self.metadata[m]["year"] alt_title = None used_alt = False if "alt_title" in self.metadata[m]: if self.metadata[m]["alt_title"] is None: logger.error("Metadata Error: alt_title attribute is blank") else: alt_title = self.metadata[m]["alt_title"] try: item = self.get_item(m, year=year) except Failed as e: if alt_title: try: item = self.get_item(alt_title, year=year) used_alt = True except Failed as alt_e: logger.error(alt_e) logger.error("Skipping {}".format(m)) continue else: logger.error(e) logger.error("Skipping {}".format(m)) continue"Updating {}: {}...".format("Movie" if self.is_movie else "Show", alt_title if used_alt else m)) edits = {} def add_edit(name, group, key=None, value=None, sub=None): if value or name in group: if value or group[name]: if key is None: key = name if value is None: value = group[name] if sub and "sub" in group: if group["sub"]: if group["sub"] is True and "(SUB)" not in value: value = "{} (SUB)".format(value) elif group["sub"] is False and " (SUB)" in value: value = value[:-6] else: logger.error("Metadata Error: sub attribute is blank") edits["{}.value".format(key)] = value edits["{}.locked".format(key)] = 1 else: logger.error("Metadata Error: {} attribute is blank".format(name)) if used_alt or "sub" in self.metadata[m]: add_edit("title", self.metadata[m], value=m, sub=True) add_edit("sort_title", self.metadata[m], key="titleSort") add_edit("originally_available", self.metadata[m], key="originallyAvailableAt") add_edit("rating", self.metadata[m]) add_edit("content_rating", self.metadata[m], key="contentRating") add_edit("original_title", self.metadata[m], key="originalTitle") add_edit("studio", self.metadata[m]) add_edit("tagline", self.metadata[m]) add_edit("summary", self.metadata[m]) try: item.edit(**edits) item.reload()"{}: {} Details Update Successful".format("Movie" if self.is_movie else "Show", m)) except BadRequest: logger.error("{}: {} Details Update Failed".format("Movie" if self.is_movie else "Show", m)) logger.debug("Details Update: {}".format(edits)) util.print_stacktrace() if "genre" in self.metadata[m]: if self.metadata[m]["genre"]: genre_sync = False if "genre_sync_mode" in self.metadata[m]: if self.metadata[m]["genre_sync_mode"] is None: logger.error("Metadata Error: genre_sync_mode attribute is blank defaulting to append") elif self.metadata[m]["genre_sync_mode"] not in ["append", "sync"]: logger.error("Metadata Error: genre_sync_mode attribute must be either 'append' or 'sync' defaulting to append") elif self.metadata[m]["genre_sync_mode"] == "sync": genre_sync = True genres = [genre.tag for genre in item.genres] values = util.get_list(self.metadata[m]["genre"]) if genre_sync: for genre in (g for g in genres if g not in values): item.removeGenre(genre)"Detail: Genre {} removed".format(genre)) for value in (v for v in values if v not in genres): item.addGenre(value)"Detail: Genre {} added".format(value)) else: logger.error("Metadata Error: genre attribute is blank") if "label" in self.metadata[m]: if self.metadata[m]["label"]: label_sync = False if "label_sync_mode" in self.metadata[m]: if self.metadata[m]["label_sync_mode"] is None: logger.error("Metadata Error: label_sync_mode attribute is blank defaulting to append") elif self.metadata[m]["label_sync_mode"] not in ["append", "sync"]: logger.error("Metadata Error: label_sync_mode attribute must be either 'append' or 'sync' defaulting to append") elif self.metadata[m]["label_sync_mode"] == "sync": label_sync = True labels = [label.tag for label in item.labels] values = util.get_list(self.metadata[m]["label"]) if label_sync: for label in (l for l in labels if l not in values): item.removeLabel(label)"Detail: Label {} removed".format(label)) for value in (v for v in values if v not in labels): item.addLabel(v)"Detail: Label {} added".format(v)) else: logger.error("Metadata Error: label attribute is blank") if "seasons" in self.metadata[m] and self.is_show: if self.metadata[m]["seasons"]: for season_id in self.metadata[m]["seasons"]:"")"Updating season {} of {}...".format(season_id, alt_title if used_alt else m)) if isinstance(season_id, int): try: season = item.season(season_id) except NotFound: logger.error("Metadata Error: Season: {} not found".format(season_id)) else: edits = {} add_edit("title", self.metadata[m]["seasons"][season_id], sub=True) add_edit("summary", self.metadata[m]["seasons"][season_id]) try: season.edit(**edits) season.reload()"Season: {} Details Update Successful".format(season_id)) except BadRequest: logger.debug("Season: {} Details Update: {}".format(season_id, edits)) logger.error("Season: {} Details Update Failed".format(season_id)) util.print_stacktrace() else: logger.error("Metadata Error: Season: {} invalid, it must be an integer".format(season_id)) else: logger.error("Metadata Error: seasons attribute is blank") if "episodes" in self.metadata[m] and self.is_show: if self.metadata[m]["episodes"]: for episode_str in self.metadata[m]["episodes"]:"") match ="[Ss]{1}\d+[Ee]{1}\d+", episode_str) if match: output =[1:].split("E" if "E" in else "e") episode_id = int(output[0]) season_id = int(output[1])"Updating episode S{}E{} of {}...".format(episode_id, season_id, alt_title if used_alt else m)) try: episode = item.episode(season=season_id, episode=episode_id) except NotFound: logger.error("Metadata Error: episode {} of season {} not found".format(episode_id, season_id)) else: edits = {} add_edit("title", self.metadata[m]["episodes"][episode_str], sub=True) add_edit("sort_title", self.metadata[m]["episodes"][episode_str], key="titleSort") add_edit("rating", self.metadata[m]["episodes"][episode_str]) add_edit("originally_available", self.metadata[m]["episodes"][episode_str], key="originallyAvailableAt") add_edit("summary", self.metadata[m]["episodes"][episode_str]) try: episode.edit(**edits) episode.reload()"Season: {} Episode: {} Details Update Successful".format(season_id, episode_id)) except BadRequest: logger.debug("Season: {} Episode: {} Details Update: {}".format(season_id, episode_id, edits)) logger.error("Season: {} Episode: {} Details Update Failed".format(season_id, episode_id)) util.print_stacktrace() else: logger.error("Metadata Error: episode {} invlaid must have S##E## format".format(episode_str)) else: logger.error("Metadata Error: episodes attribute is blank")