import os, plexapi, re, requests from datetime import datetime, timedelta from modules import builder, util from modules.library import Library from modules.util import Failed, ImageData from PIL import Image from plexapi import utils from plexapi.audio import Artist, Track, Album from plexapi.exceptions import BadRequest, NotFound, Unauthorized from plexapi.collection import Collection from plexapi.library import Actor from plexapi.playlist import Playlist from plexapi.server import PlexServer from plexapi.video import Movie, Show, Season, Episode from retrying import retry from urllib import parse from xml.etree.ElementTree import ParseError logger = util.logger builders = ["plex_all", "plex_pilots", "plex_collectionless", "plex_search"] search_translation = { "episode_title": "episode.title", "network": "show.network", "critic_rating": "rating", "audience_rating": "audienceRating", "user_rating": "userRating", "episode_user_rating": "episode.userRating", "content_rating": "contentRating", "episode_year": "episode.year", "release": "originallyAvailableAt", "show_unmatched": "show.unmatched", "episode_unmatched": "episode.unmatched", "episode_duplicate": "episode.duplicate", "added": "addedAt", "episode_added": "episode.addedAt", "episode_air_date": "episode.originallyAvailableAt", "plays": "viewCount", "episode_plays": "episode.viewCount", "last_played": "lastViewedAt", "episode_last_played": "episode.lastViewedAt", "unplayed": "unwatched", "episode_unplayed": "episode.unwatched", "subtitle_language": "subtitleLanguage", "audio_language": "audioLanguage", "progress": "inProgress", "episode_progress": "episode.inProgress", "unplayed_episodes": "show.unwatchedLeaves", "season_collection": "season.collection", "episode_collection": "episode.collection", "artist_title": "artist.title", "artist_user_rating": "artist.userRating", "artist_genre": "artist.genre", "artist_collection": "artist.collection", "artist_country": "artist.country", "artist_mood": "artist.mood", "artist_style": "artist.style", "artist_added": "artist.addedAt", "artist_last_played": "artist.lastViewedAt", "artist_unmatched": "artist.unmatched", "album_title": "album.title", "album_year": "album.year", "album_decade": "album.decade", "album_genre": "album.genre", "album_plays": "album.viewCount", "album_last_played": "album.lastViewedAt", "album_user_rating": "album.userRating", "album_critic_rating": "album.rating", "album_record_label": "album.studio", "album_mood": "album.mood", "album_style": "album.style", "album_format": "album.format", "album_type": "album.subformat", "album_collection": "album.collection", "album_added": "album.addedAt", "album_released": "album.originallyAvailableAt", "album_unmatched": "album.unmatched", "album_source": "album.source", "album_label": "album.label", "track_mood": "track.mood", "track_title": "track.title", "track_plays": "track.viewCount", "track_last_played": "track.lastViewedAt", "track_skips": "track.skipCount", "track_last_skipped": "track.lastSkippedAt", "track_user_rating": "track.userRating", "track_last_rated": "track.lastRatedAt", "track_added": "track.addedAt", "track_trash": "track.trash", "track_source": "track.source" } show_translation = { "title": "show.title", "studio": "show.studio", "rating": "show.rating", "audienceRating": "show.audienceRating", "userRating": "show.userRating", "contentRating": "show.contentRating", "year": "show.year", "originallyAvailableAt": "show.originallyAvailableAt", "unmatched": "show.unmatched", "genre": "show.genre", "collection": "show.collection", "actor": "show.actor", "addedAt": "show.addedAt", "viewCount": "show.viewCount", "lastViewedAt": "show.lastViewedAt", "resolution": "episode.resolution", "hdr": "episode.hdr", "subtitleLanguage": "episode.subtitleLanguage", "audioLanguage": "episode.audioLanguage", "trash": "episode.trash", "label": "show.label", } modifier_translation = { "": "", ".not": "!", ".is": "%3D", ".isnot": "!%3D", ".gt": "%3E%3E", ".gte": "%3E", ".lt": "%3C%3C", ".lte": "%3C", ".before": "%3C%3C", ".after": "%3E%3E", ".begins": "%3C", ".ends": "%3E", ".regex": "" } attribute_translation = { "record_label": "studio", "actor": "actors", "audience_rating": "audienceRating", "collection": "collections", "content_rating": "contentRating", "country": "countries", "critic_rating": "rating", "director": "directors", "genre": "genres", "label": "labels", "producer": "producers", "release": "originallyAvailableAt", "added": "addedAt", "last_played": "lastViewedAt", "plays": "viewCount", "user_rating": "userRating", "writer": "writers", "mood": "moods", "style": "styles" } method_alias = { "actors": "actor", "role": "actor", "roles": "actor", "show_actor": "actor", "show_actors": "actor", "show_role": "actor", "show_roles": "actor", "collections": "collection", "plex_collection": "collection", "show_collections": "collection", "show_collection": "collection", "content_ratings": "content_rating", "contentRating": "content_rating", "contentRatings": "content_rating", "countries": "country", "decades": "decade", "directors": "director", "genres": "genre", "labels": "label", "collection_minimum": "minimum_items", "playlist_minimum": "minimum_items", "rating": "critic_rating", "show_user_rating": "user_rating", "video_resolution": "resolution", "tmdb_trending": "tmdb_trending_daily", "play": "plays", "show_plays": "plays", "show_play": "plays", "episode_play": "episode_plays", "originally_available": "release", "episode_originally_available": "episode_air_date", "episode_release": "episode_air_date", "episode_released": "episode_air_date", "show_originally_available": "release", "show_release": "release", "show_air_date": "release", "released": "release", "show_released": "release", "max_age": "release", "studios": "studio", "networks": "network", "producers": "producer", "writers": "writer", "years": "year", "show_year": "year", "show_years": "year", "show_title": "title", "filter": "filters", "seasonyear": "year", "isadult": "adult", "startdate": "start", "enddate": "end", "averagescore": "score", "minimum_tag_percentage": "min_tag_percent", "minimumtagrank": "min_tag_percent", "minimum_tag_rank": "min_tag_percent", "anilist_tag": "anilist_search", "anilist_genre": "anilist_search", "anilist_season": "anilist_search", "mal_producer": "mal_studio", "mal_licensor": "mal_studio", "trakt_recommended": "trakt_recommended_weekly", "trakt_watched": "trakt_watched_weekly", "trakt_collected": "trakt_collected_weekly", "collection_changes_webhooks": "changes_webhooks", "radarr_add": "radarr_add_missing", "sonarr_add": "sonarr_add_missing", "trakt_recommended_personal": "trakt_recommendations" } modifier_alias = {".greater": ".gt", ".less": ".lt"} album_sorting_options = {"default": -1, "newest": 0, "oldest": 1, "name": 2} episode_sorting_options = {"default": -1, "oldest": 0, "newest": 1} keep_episodes_options = {"all": 0, "5_latest": 5, "3_latest": 3, "latest": 1, "past_3": -3, "past_7": -7, "past_30": -30} delete_episodes_options = {"never": 0, "day": 1, "week": 7, "refresh": 100} season_display_options = {"default": -1, "show": 0, "hide": 1} episode_ordering_options = {"default": None, "tmdb_aired": "tmdbAiring", "tvdb_aired": "aired", "tvdb_dvd": "dvd", "tvdb_absolute": "absolute"} plex_languages = ["default", "ar-SA", "ca-ES", "cs-CZ", "da-DK", "de-DE", "el-GR", "en-AU", "en-CA", "en-GB", "en-US", "es-ES", "es-MX", "et-EE", "fa-IR", "fi-FI", "fr-CA", "fr-FR", "he-IL", "hi-IN", "hu-HU", "id-ID", "it-IT", "ja-JP", "ko-KR", "lt-LT", "lv-LV", "nb-NO", "nl-NL", "pl-PL", "pt-BR", "pt-PT", "ro-RO", "ru-RU", "sk-SK", "sv-SE", "th-TH", "tr-TR", "uk-UA", "vi-VN", "zh-CN", "zh-HK", "zh-TW"] metadata_language_options = {lang.lower(): lang for lang in plex_languages} metadata_language_options["default"] = None use_original_title_options = {"default": -1, "no": 0, "yes": 1} collection_order_options = ["release", "alpha", "custom"] collection_filtering_options = ["user", "admin"] collection_mode_options = { "default": "default", "hide": "hide", "hide_items": "hideItems", "hideitems": "hideItems", "show_items": "showItems", "showitems": "showItems" } collection_level_show_options = ["episode", "season"] collection_level_music_options = ["album", "track"] collection_level_options = collection_level_show_options + collection_level_music_options collection_mode_keys = {-1: "default", 0: "hide", 1: "hideItems", 2: "showItems"} collection_order_keys = {0: "release", 1: "alpha", 2: "custom"} item_advance_keys = { "item_album_sorting": ("albumSort", album_sorting_options), "item_episode_sorting": ("episodeSort", episode_sorting_options), "item_keep_episodes": ("autoDeletionItemPolicyUnwatchedLibrary", keep_episodes_options), "item_delete_episodes": ("autoDeletionItemPolicyWatchedLibrary", delete_episodes_options), "item_season_display": ("flattenSeasons", season_display_options), "item_episode_ordering": ("showOrdering", episode_ordering_options), "item_metadata_language": ("languageOverride", metadata_language_options), "item_use_original_title": ("useOriginalTitle", use_original_title_options) } new_plex_agents = ["tv.plex.agents.movie", "tv.plex.agents.series"] and_searches = [ "title.and", "studio.and", "actor.and", "audio_language.and", "collection.and", "content_rating.and", "country.and", "director.and", "genre.and", "label.and", "network.and", "producer.and", "subtitle_language.and", "writer.and" ] or_searches = [ "title", "studio", "actor", "audio_language", "collection", "content_rating", "country", "director", "genre", "label", "network", "producer", "subtitle_language", "writer", "decade", "resolution", "year", "episode_title", "episode_year" ] movie_only_searches = [ "country", "country.not", "director", "director.not", "producer", "producer.not", "writer", "writer.not", "decade", "duplicate", "unplayed", "progress", "duration.gt", "duration.gte", "duration.lt", "duration.lte" ] show_only_searches = [ "network", "network.not", "season_collection", "season_collection.not", "episode_collection", "episode_collection.not", "episode_title", "episode_title.not", "episode_title.is", "episode_title.isnot", "episode_title.begins", "episode_title.ends", "episode_added", "episode_added.not", "episode_added.before", "episode_added.after", "episode_air_date", "episode_air_date.not", "episode_air_date.before", "episode_air_date.after", "episode_last_played", "episode_last_played.not", "episode_last_played.before", "episode_last_played.after", "episode_plays.gt", "episode_plays.gte", "episode_plays.lt", "episode_plays.lte", "episode_user_rating.gt", "episode_user_rating.gte", "episode_user_rating.lt", "episode_user_rating.lte", "episode_year", "episode_year.not", "episode_year.gt", "episode_year.gte", "episode_year.lt", "episode_year.lte", "unplayed_episodes", "episode_unplayed", "episode_duplicate", "episode_progress", "episode_unmatched", "show_unmatched", ] string_attributes = ["title", "studio", "episode_title", "artist_title", "album_title", "album_record_label", "track_title"] string_modifiers = ["", ".not", ".is", ".isnot", ".begins", ".ends"] boolean_attributes = [ "hdr", "unmatched", "duplicate", "unplayed", "progress", "trash", "unplayed_episodes", "episode_unplayed", "episode_duplicate", "episode_progress", "episode_unmatched", "show_unmatched", "artist_unmatched", "album_unmatched", "track_trash" ] tmdb_attributes = ["actor", "director", "producer", "writer"] date_attributes = [ "added", "episode_added", "release", "episode_air_date", "last_played", "episode_last_played", "artist_added", "artist_last_played", "album_last_played", "album_added", "album_released", "track_last_played", "track_last_skipped", "track_last_rated", "track_added" ] date_modifiers = ["", ".not", ".before", ".after"] year_attributes = ["decade", "year", "episode_year", "album_year", "album_decade"] number_attributes = ["plays", "episode_plays", "album_plays", "track_plays", "track_skips"] + year_attributes float_attributes = [ "user_rating", "episode_user_rating", "critic_rating", "audience_rating", "duration", "artist_user_rating", "album_user_rating", "album_critic_rating", "track_user_rating" ] number_modifiers = [".gt", ".gte", ".lt", ".lte"] search_display = {"added": "Date Added", "release": "Release Date", "hdr": "HDR", "progress": "In Progress", "episode_progress": "Episode In Progress"} tag_attributes = [ "actor", "audio_language", "collection", "content_rating", "country", "director", "genre", "label", "network", "producer", "resolution", "studio", "subtitle_language", "writer", "season_collection", "episode_collection", "artist_genre", "artist_collection", "artist_country", "artist_mood", "artist_style", "album_genre", "album_mood", "album_style", "album_format", "album_type", "album_collection", "album_source", "album_label", "track_mood", "track_source" ] tag_modifiers = ["", ".not", ".regex"] no_not_mods = ["resolution", "decade", "album_decade"] searches = boolean_attributes + \ [f"{f}{m}" for f in string_attributes for m in string_modifiers] + \ [f"{f}{m}" for f in tag_attributes + year_attributes for m in tag_modifiers if f not in no_not_mods or m != ".not"] + \ [f"{f}{m}" for f in date_attributes for m in date_modifiers] + \ [f"{f}{m}" for f in number_attributes + float_attributes for m in number_modifiers if f not in no_not_mods] music_searches = [a for a in searches if a.startswith(("artist", "album", "track"))] movie_sorts = { "title.asc": "titleSort", "title.desc": "titleSort%3Adesc", "year.asc": "year", "year.desc": "year%3Adesc", "originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc", "release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc", "critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc", "audience_rating.asc": "audienceRating", "audience_rating.desc": "audienceRating%3Adesc", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "content_rating.asc": "contentRating", "content_rating.desc": "contentRating%3Adesc", "duration.asc": "duration", "duration.desc": "duration%3Adesc", "progress.asc": "viewOffset", "progress.desc": "viewOffset%3Adesc", "plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "viewed.asc": "lastViewedAt", "viewed.desc": "lastViewedAt%3Adesc", "resolution.asc": "mediaHeight", "resolution.desc": "mediaHeight%3Adesc", "bitrate.asc": "mediaBitrate", "bitrate.desc": "mediaBitrate%3Adesc", "random": "random" } show_sorts = { "title.asc": "titleSort", "title.desc": "titleSort%3Adesc", "year.asc": "year", "year.desc": "year%3Adesc", "originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc", "release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc", "critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc", "audience_rating.asc": "audienceRating", "audience_rating.desc": "audienceRating%3Adesc", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "content_rating.asc": "contentRating", "content_rating.desc": "contentRating%3Adesc", "unplayed.asc": "unviewedLeafCount", "unplayed.desc": "unviewedLeafCount%3Adesc", "episode_added.asc": "episode.addedAt", "episode_added.desc": "episode.addedAt%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "viewed.asc": "lastViewedAt", "viewed.desc": "lastViewedAt%3Adesc", "random": "random" } season_sorts = { "season.asc": "season.index%2Cseason.titleSort", "season.desc": "season.index%3Adesc%2Cseason.titleSort", "show.asc": "show.titleSort%2Cindex", "show.desc": "show.titleSort%3Adesc%2Cindex", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "random": "random" } episode_sorts = { "title.asc": "titleSort", "title.desc": "titleSort%3Adesc", "show.asc": "show.titleSort%2Cseason.index%3AnullsLast%2Cepisode.index%3AnullsLast%2Cepisode.originallyAvailableAt%3AnullsLast%2Cepisode.titleSort%2Cepisode.id", "show.desc": "show.titleSort%3Adesc%2Cseason.index%3AnullsLast%2Cepisode.index%3AnullsLast%2Cepisode.originallyAvailableAt%3AnullsLast%2Cepisode.titleSort%2Cepisode.id", "year.asc": "year", "year.desc": "year%3Adesc", "originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc", "release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc", "critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc", "audience_rating.asc": "audienceRating", "audience_rating.desc": "audienceRating%3Adesc", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "duration.asc": "duration", "duration.desc": "duration%3Adesc", "progress.asc": "viewOffset", "progress.desc": "viewOffset%3Adesc", "plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "viewed.asc": "lastViewedAt", "viewed.desc": "lastViewedAt%3Adesc", "resolution.asc": "mediaHeight", "resolution.desc": "mediaHeight%3Adesc", "bitrate.asc": "mediaBitrate", "bitrate.desc": "mediaBitrate%3Adesc", "random": "random" } artist_sorts = { "title.asc": "titleSort", "title.desc": "titleSort%3Adesc", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "played.asc": "lastViewedAt", "played.desc": "lastViewedAt%3Adesc", "plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc", "random": "random" } album_sorts = { "title.asc": "titleSort", "title.desc": "titleSort%3Adesc", "album_artist.asc": "artist.titleSort%2Calbum.titleSort%2Calbum.index%2Calbum.id%2Calbum.originallyAvailableAt", "album_artist.desc": "artist.titleSort%3Adesc%2Calbum.titleSort%2Calbum.index%2Calbum.id%2Calbum.originallyAvailableAt", "year.asc": "year", "year.desc": "year%3Adesc", "originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc", "release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc", "critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "played.asc": "lastViewedAt", "played.desc": "lastViewedAt%3Adesc", "plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc", "random": "random" } track_sorts = { "title.asc": "titleSort", "title.desc": "titleSort%3Adesc", "album_artist.asc": "artist.titleSort%2Calbum.titleSort%2Calbum.year%2Ctrack.absoluteIndex%2Ctrack.index%2Ctrack.titleSort%2Ctrack.id", "album_artist.desc": "artist.titleSort%3Adesc%2Calbum.titleSort%2Calbum.year%2Ctrack.absoluteIndex%2Ctrack.index%2Ctrack.titleSort%2Ctrack.id", "artist.asc": "originalTitle", "artist.desc": "originalTitle%3Adesc", "album.asc": "album.titleSort", "album.desc": "album.titleSort%3Adesc", "user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc", "duration.asc": "duration", "duration.desc": "duration%3Adesc", "plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc", "added.asc": "addedAt", "added.desc": "addedAt%3Adesc", "played.asc": "lastViewedAt", "played.desc": "lastViewedAt%3Adesc", "rated.asc": "lastRatedAt", "rated.desc": "lastRatedAt%3Adesc", "popularity.asc": "ratingCount", "popularity.desc": "ratingCount%3Adesc", "bitrate.asc": "mediaBitrate", "bitrate.desc": "mediaBitrate%3Adesc", "random": "random" } sort_types = { "movies": ("title.asc", 1, movie_sorts), "shows": ("title.asc", 2, show_sorts), "seasons": ("season.asc", 3, season_sorts), "episodes": ("title.asc", 4, episode_sorts), "artists": ("title.asc", 8, artist_sorts), "albums": ("title.asc", 9, album_sorts), "tracks": ("title.asc", 10, track_sorts) } class Plex(Library): def __init__(self, config, params): super().__init__(config, params) self.plex = params["plex"] self.url = params["plex"]["url"] self.token = params["plex"]["token"] self.timeout = params["plex"]["timeout"] logger.secret(self.url) logger.secret(self.token) try: self.PlexServer = PlexServer(baseurl=self.url, token=self.token, session=self.config.session, timeout=self.timeout) except Unauthorized: raise Failed("Plex Error: Plex token is invalid") except ValueError as e: raise Failed(f"Plex Error: {e}") except (requests.exceptions.ConnectionError, ParseError): logger.stacktrace() raise Failed("Plex Error: Plex url is invalid") self.Plex = None library_names = [] for s in self.PlexServer.library.sections(): library_names.append(s.title) if s.title == params["name"]: self.Plex = s break if not self.Plex: raise Failed(f"Plex Error: Plex Library '{params['name']}' not found. Options: {library_names}") if self.Plex.type in ["movie", "show", "artist"]: self.type = self.Plex.type.capitalize() else: raise Failed(f"Plex Error: Plex Library must be a Movies or TV Shows library") self._users = [] self._all_items = [] self.agent = self.Plex.agent self.is_movie = self.type == "Movie" self.is_show = self.type == "Show" self.is_music = self.type == "Artist" self.is_other = self.agent == "com.plexapp.agents.none" if self.is_other and self.type == "Movie": self.type = "Video" if not self.is_music and self.update_blank_track_titles: self.update_blank_track_titles = False logger.error(f"update_blank_track_titles library operation only works with music libraries") def notify(self, text, collection=None, critical=True): self.config.notify(text, server=self.PlexServer.friendlyName, library=self.name, collection=collection, critical=critical) def set_server_preroll(self, preroll): self.PlexServer.settings.get('cinemaTrailersPrerollID').set(preroll) self.PlexServer.settings.save() def get_all_collections(self, label=None): args = "?type=18" if label: label_id = next((c.key for c in self.get_tags("label") if c.title == label), None) if label_id: args = f"{args}&{label_id}" else: return [] return self.get_filter_items(args) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def search(self, title=None, sort=None, maxresults=None, libtype=None, **kwargs): return self.Plex.search(title=title, sort=sort, maxresults=maxresults, libtype=libtype, **kwargs) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def exact_search(self, title, libtype=None, year=None): terms = {"title=": title} if year: terms["year"] = year return self.Plex.search(libtype=libtype, **terms) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def fetchItem(self, data): return self.PlexServer.fetchItem(data) def get_all(self, collection_level=None, load=False): if load and collection_level in [None, "show", "artist", "movie"]: self._all_items = [] if self._all_items and collection_level in [None, "show", "artist", "movie"]: return self._all_items collection_type = collection_level if collection_level else self.Plex.TYPE if not collection_level: collection_level = self.type logger.info(f"Loading All {collection_level.capitalize()}s from Library: {self.name}") key = f"/library/sections/{self.Plex.key}/all?includeGuids=1&type={utils.searchType(collection_type)}" container_start = 0 container_size = plexapi.X_PLEX_CONTAINER_SIZE results = [] while self.Plex._totalViewSize is None or container_start <= self.Plex._totalViewSize: results.extend(self.fetchItems(key, container_start, container_size)) logger.ghost(f"Loaded: {container_start}/{self.Plex._totalViewSize}") container_start += container_size logger.info(f"Loaded {self.Plex._totalViewSize} {collection_level.capitalize()}s") self._all_items = results return results def upload_theme(self, collection, url=None, filepath=None): key = f"/library/metadata/{collection.ratingKey}/themes" if url: self.PlexServer.query(f"{key}?url={parse.quote_plus(url)}", method=self.PlexServer._session.post) elif filepath: self.PlexServer.query(key, method=self.PlexServer._session.post, data=open(filepath, 'rb').read()) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def create_playlist(self, name, items): return self.PlexServer.createPlaylist(name, items=items) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def fetchItems(self, key, container_start, container_size): return self.Plex.fetchItems(key, container_start=container_start, container_size=container_size) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def moveItem(self, obj, item, after): obj.moveItem(item, after=after) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def query(self, method): return method() @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def query_data(self, method, data): return method(data) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed) def query_collection(self, item, collection, locked=True, add=True): if add: item.addCollection(collection, locked=locked) else: item.removeCollection(collection, locked=locked) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def collection_mode_query(self, collection, data): collection.modeUpdate(mode=data) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def collection_order_query(self, collection, data): collection.sortUpdate(sort=data) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def reload(self, item, force=False): is_full = False cached_item = item if cached_item.ratingKey in self.cached_items: cached_item, is_full = self.cached_items[cached_item.ratingKey] try: if not is_full or force: cached_item.reload(checkFiles=False, includeAllConcerts=False, includeBandwidths=False, includeChapters=False, includeChildren=False, includeConcerts=False, includeExternalMedia=False, includeExtras=False, includeFields=False, includeGeolocation=False, includeLoudnessRamps=False, includeMarkers=False, includeOnDeck=False, includePopularLeaves=False, includeRelated=False, includeRelatedCount=0, includeReviews=False, includeStations=False) self.cached_items[cached_item.ratingKey] = (cached_item, True) return cached_item except (BadRequest, NotFound) as e: logger.stacktrace() raise Failed(f"Item Failed to Load: {e}") @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def edit_query(self, item, edits, advanced=False): if advanced: item.editAdvanced(**edits) else: item.edit(**edits) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def _upload_image(self, item, image): try: if image.is_poster and image.is_url: item.uploadPoster(url=image.location) elif image.is_poster: item.uploadPoster(filepath=image.location) elif image.is_url: item.uploadArt(url=image.location) else: item.uploadArt(filepath=image.location) self.reload(item, force=True) except BadRequest as e: item.refresh() raise Failed(e) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def upload_poster(self, item, image, url=False): if url: item.uploadPoster(url=image) else: item.uploadPoster(filepath=image) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed) def get_actor_id(self, name): results = self.Plex.hubSearch(name) for result in results: if isinstance(result, Actor) and result.librarySectionID == self.Plex.key and result.tag == name: return result.id def get_search_choices(self, search_name, title=True, name_pairs=False): final_search = search_translation[search_name] if search_name in search_translation else search_name final_search = show_translation[final_search] if self.is_show and final_search in show_translation else final_search try: names = [] choices = {} use_title = title and final_search not in ["contentRating", "audioLanguage", "subtitleLanguage", "resolution"] for choice in self.get_tags(final_search): if choice.title not in names: names.append((choice.title, choice.key) if name_pairs else choice.title) choices[choice.title] = choice.title if use_title else choice.key choices[choice.key] = choice.title if use_title else choice.key choices[choice.title.lower()] = choice.title if use_title else choice.key choices[choice.key.lower()] = choice.title if use_title else choice.key return choices, names except NotFound: logger.debug(f"Search Attribute: {final_search}") raise Failed(f"Plex Error: plex_search attribute: {search_name} not supported") @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def get_tags(self, tag): return self.Plex.listFilterChoices(field=tag) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) def _query(self, key, post=False, put=False): if post: method = self.Plex._server._session.post elif put: method = self.Plex._server._session.put else: method = None return self.Plex._server.query(key, method=method) @property def users(self): if not self._users: users = [] for user in self.PlexServer.myPlexAccount().users(): if self.PlexServer.machineIdentifier in [s.machineIdentifier for s in user.servers]: users.append(user.title) self._users = users return self._users def delete_user_playlist(self, title, user): self.PlexServer.switchUser(user).playlist(title).delete() def playlist_report(self): playlists = {} def scan_user(server, username): for playlist in server.playlists(): if playlist.title not in playlists: playlists[playlist.title] = [] playlists[playlist.title].append(username) scan_user(self.PlexServer, self.PlexServer.myPlexAccount().title) for user in self.users: scan_user(self.PlexServer.switchUser(user), user) return playlists def manage_recommendations(self): return [(r.title, r._data.attrib.get('identifier'), r._data.attrib.get('promotedToRecommended'), r._data.attrib.get('promotedToOwnHome'), r._data.attrib.get('promotedToSharedHome')) for r in self.Plex.fetchItems(f"/hubs/sections/{self.Plex.key}/manage")] def alter_collection(self, item, collection, smart_label_collection=False, add=True): if smart_label_collection: self.query_data(item.addLabel if add else item.removeLabel, collection) else: locked = True if self.agent in ["tv.plex.agents.movie", "tv.plex.agents.series"]: field = next((f for f in item.fields if f.name == "collection"), None) locked = field is not None self.query_collection(item, collection, locked=locked, add=add) def move_item(self, collection, item, after=None): key = f"{collection.key}/items/{item}/move" if after: key += f"?after={after}" self._query(key, put=True) def smart_label_check(self, label): return label in [la.title for la in self.get_tags("label")] def test_smart_filter(self, uri_args): logger.debug(f"Smart Collection Test: {uri_args}") test_items = self.get_filter_items(uri_args) if len(test_items) < 1: raise Failed(f"Plex Error: No items for smart filter: {uri_args}") def create_smart_collection(self, title, smart_type, uri_args): self.test_smart_filter(uri_args) args = { "type": smart_type, "title": title, "smart": 1, "sectionId": self.Plex.key, "uri": self.build_smart_filter(uri_args) } self._query(f"/library/collections{utils.joinArgs(args)}", post=True) def create_blank_collection(self, title): args = { "type": 1 if self.is_movie else 2 if self.is_show else 8, "title": title, "smart": 0, "sectionId": self.Plex.key, "uri": f"{self.PlexServer._uriRoot()}/library/metadata" } self._query(f"/library/collections{utils.joinArgs(args)}", post=True) def get_smart_filter_from_uri(self, uri): smart_filter = parse.parse_qs(parse.urlparse(uri.replace("/#!/", "/")).query)["key"][0] args = smart_filter[smart_filter.index("?"):] return self.build_smart_filter(args), int(args[args.index("type=") + 5:args.index("type=") + 6]) def build_smart_filter(self, uri_args): return f"{self.PlexServer._uriRoot()}/library/sections/{self.Plex.key}/all{uri_args}" def update_smart_collection(self, collection, uri_args): self.test_smart_filter(uri_args) self._query(f"/library/collections/{collection.ratingKey}/items{utils.joinArgs({'uri': self.build_smart_filter(uri_args)})}", put=True) def smart_filter(self, collection): smart_filter = self.get_collection(collection).content return smart_filter[smart_filter.index("?"):] def collection_visibility(self, collection): try: attrs = self._query(f"/hubs/sections/{self.Plex.key}/manage?metadataItemId={collection.ratingKey}")[0].attrib return { "library": utils.cast(bool, attrs.get("promotedToRecommended", "0")), "home": utils.cast(bool, attrs.get("promotedToOwnHome", "0")), "shared": utils.cast(bool, attrs.get("promotedToSharedHome", "0")) } except IndexError: return {"library": False, "home": False, "shared": False} def collection_visibility_update(self, collection, visibility=None, library=None, home=None, shared=None): if visibility is None: visibility = self.collection_visibility(collection) key = f"/hubs/sections/{self.Plex.key}/manage?metadataItemId={collection.ratingKey}" key += f"&promotedToRecommended={1 if (library is None and visibility['library']) or library else 0}" key += f"&promotedToOwnHome={1 if (home is None and visibility['home']) or home else 0}" key += f"&promotedToSharedHome={1 if (shared is None and visibility['shared']) or shared else 0}" self._query(key, post=True) def get_playlist(self, title): try: return self.PlexServer.playlist(title) except NotFound: raise Failed(f"Plex Error: Playlist {title} not found") def get_collection(self, data): if isinstance(data, int): return self.fetchItem(data) elif isinstance(data, Collection): return data else: cols = self.search(title=str(data), libtype="collection") for d in cols: if d.title == data: return d logger.debug("") for d in cols: logger.debug(f"Found: {d.title}") logger.debug(f"Looking for: {data}") raise Failed(f"Plex Error: Collection {data} not found") def validate_collections(self, collections): valid_collections = [] for collection in collections: try: valid_collections.append(self.get_collection(collection)) except Failed as e: logger.error(e) if len(valid_collections) == 0: raise Failed(f"Collection Error: No valid Plex Collections in {collections}") return valid_collections def get_rating_keys(self, method, data): items = [] if method == "plex_all": logger.info(f"Processing Plex All {data.capitalize()}s") items = self.get_all(collection_level=data) elif method == "plex_pilots": logger.info(f"Processing Plex Pilot {data.capitalize()}s") items = [] for item in self.get_all(): try: items.append(item.episode(season=1, episode=1)) except NotFound: logger.warning(f"Plex Warning: {item.title} has no Season 1 Episode 1 ") elif method == "plex_search": logger.info(f"Processing {data[1]}") items = self.get_filter_items(data[2]) elif method == "plex_collectionless": good_collections = [] logger.info(f"Processing Plex Collectionless") logger.info("Collections Excluded") for col in self.get_all_collections(): keep_collection = True for pre in data["exclude_prefix"]: if col.title.startswith(pre) or (col.titleSort and col.titleSort.startswith(pre)): keep_collection = False logger.info(f"{col.title} excluded by prefix match {pre}") break if keep_collection: for ext in data["exclude"]: if col.title == ext or (col.titleSort and col.titleSort == ext): keep_collection = False logger.info(f"{col.title} excluded by exact match") break if keep_collection: logger.info(f"Collection Passed: {col.title}") good_collections.append(col) logger.info("") logger.info("Collections Not Excluded (Items in these collections are not added to Collectionless)") for col in good_collections: logger.info(col.title) collection_indexes = [c.index for c in good_collections] all_items = self.get_all() for i, item in enumerate(all_items, 1): logger.ghost(f"Processing: {i}/{len(all_items)} {item.title}") add_item = True for collection in item.collections: if collection.id in collection_indexes: add_item = False break if add_item: items.append(item) logger.info(f"Processed {len(all_items)} {self.type}s") else: raise Failed(f"Plex Error: Method {method} not supported") if len(items) > 0: ids = [(item.ratingKey, "ratingKey") for item in items] logger.debug("") logger.debug(f"{len(ids)} Keys Found: {ids}") return ids else: raise Failed("Plex Error: No Items found in Plex") def get_collection_items(self, collection, smart_label_collection): if smart_label_collection: return self.search(label=collection.title if isinstance(collection, Collection) else str(collection)) elif isinstance(collection, (Collection, Playlist)): if collection.smart: return self.get_filter_items(self.smart_filter(collection)) else: return self.query(collection.items) else: return [] def get_filter_items(self, uri_args): key = f"/library/sections/{self.Plex.key}/all{uri_args}" logger.debug(key) return self.Plex._search(key, None, 0, plexapi.X_PLEX_CONTAINER_SIZE) def get_collection_name_and_items(self, collection, smart_label_collection): name = collection.title if isinstance(collection, (Collection, Playlist)) else str(collection) return name, self.get_collection_items(collection, smart_label_collection) def get_tmdb_from_map(self, item): return self.movie_rating_key_map[item.ratingKey] if item.ratingKey in self.movie_rating_key_map else None def get_tvdb_from_map(self, item): return self.show_rating_key_map[item.ratingKey] if item.ratingKey in self.show_rating_key_map else None def search_item(self, data, year=None): kwargs = {} if year is not None: kwargs["year"] = year for d in self.search(title=str(data), **kwargs): if d.title == data: return d return None def edit_advance(self, item, edits): try: self.edit_query(item, edits, advanced=True) if "languageOverride" in edits or "useOriginalTitle" in edits: self.query(item.refresh) return True except BadRequest: logger.stacktrace() return False def edit_tags(self, attr, obj, add_tags=None, remove_tags=None, sync_tags=None, do_print=True): display = "" final = "" key = attribute_translation[attr] if attr in attribute_translation else attr attr_display = attr.replace("_", " ").title() attr_call = attr_display.replace(" ", "") if add_tags or remove_tags or sync_tags is not None: _add_tags = add_tags if add_tags else [] _remove_tags = [t.lower() for t in remove_tags] if remove_tags else [] _sync_tags = [t.lower() for t in sync_tags] if sync_tags else [] try: obj = self.reload(obj) _item_tags = [item_tag.tag.lower() for item_tag in getattr(obj, key)] except BadRequest: _item_tags = [] _add = [f"{t[:1].upper()}{t[1:]}" for t in _add_tags + _sync_tags if t.lower() not in _item_tags] _remove = [t for t in _item_tags if (sync_tags is not None and t not in _sync_tags) or t in _remove_tags] if _add: self.query_data(getattr(obj, f"add{attr_call}"), _add) display += f"+{', +'.join(_add)}" if _remove: self.query_data(getattr(obj, f"remove{attr_call}"), _remove) display += f"-{', -'.join(_remove)}" final = f"{obj.title[:25]:<25} | {attr_display} | {display}" if display else display if do_print and final: logger.info(final) return final def item_images(self, item, group, alias, asset_location=None, top_item=None, title=None, image_name=None): if title is None: title = item.title posters, backgrounds = util.get_image_dicts(group, alias) try: asset_poster, asset_background, item_dir, _ = self.find_item_assets(item, item_asset_directory=asset_location, top_item=top_item) if asset_poster: posters["asset_directory"] = asset_poster if asset_background: backgrounds["asset_directory"] = asset_background if asset_location is None: asset_location = item_dir except Failed as e: logger.warning(e) poster = util.pick_image(title, posters, self.prioritize_assets, self.download_url_assets, asset_location, image_name=image_name) background = util.pick_image(title, backgrounds, self.prioritize_assets, self.download_url_assets, asset_location, is_poster=False, image_name=f"{image_name}_background" if image_name else image_name) if poster and poster.attribute == "asset_directory": poster = None if background and background.attribute == "asset_directory": background = None if poster or background: self.upload_images(item, poster=poster, background=background) return asset_location def find_item_assets(self, item, item_asset_directory=None, asset_directory=None, top_item=None): poster = None background = None folder_name = None if asset_directory is None: asset_directory = self.asset_directory is_top_level = isinstance(item, (Movie, Artist, Show, Collection, Playlist, str)) if isinstance(item, Album): prefix = f"{top_item.title} Album {item.title}'s " file_name = item.title elif isinstance(item, Season): prefix = f"{top_item.title} Season {item.seasonNumber}'s " file_name = f"Season{'0' if item.seasonNumber < 10 else ''}{item.seasonNumber}" elif isinstance(item, Episode): prefix = f"{top_item.title} {item.seasonEpisode.upper()}'s " file_name = item.seasonEpisode.upper() else: prefix = f"{item if isinstance(item, str) else item.title}'s " file_name = "poster" if not item_asset_directory: if isinstance(item, (Movie, Artist, Album, Show, Episode, Season)): starting = item.show() if isinstance(item, (Episode, Season)) else item path_test = str(starting.locations[0]) if not os.path.dirname(path_test): path_test = path_test.replace("\\", "/") folder_name = os.path.basename(os.path.dirname(path_test) if isinstance(starting, Movie) else path_test) elif isinstance(item, (Collection, Playlist)): folder_name, _ = util.validate_filename(item.title) else: folder_name, _ = util.validate_filename(item) if not self.asset_folders: file_name = folder_name if file_name == "poster" else f"{folder_name}_{file_name}" for ad in asset_directory: if self.asset_folders: if os.path.isdir(os.path.join(ad, folder_name)): item_asset_directory = os.path.join(ad, folder_name) else: for n in range(1, self.asset_depth + 1): new_path = ad for i in range(1, n + 1): new_path = os.path.join(new_path, "*") matches = util.glob_filter(os.path.join(new_path, folder_name)) if len(matches) > 0: item_asset_directory = os.path.abspath(matches[0]) break else: matches = util.glob_filter(os.path.join(ad, f"{file_name}.*")) if len(matches) > 0: item_asset_directory = ad if item_asset_directory: break if not item_asset_directory: extra = "" if self.create_asset_folders and self.asset_folders: item_asset_directory = os.path.join(asset_directory[0], folder_name) os.makedirs(item_asset_directory, exist_ok=True) extra = f"\nAsset Directory Created: {item_asset_directory}" raise Failed(f"Asset Warning: Unable to find asset {'folder' if self.asset_folders else 'file'}: '{folder_name if self.asset_folders else file_name}{extra}'") poster_filter = os.path.join(item_asset_directory, f"{file_name}.*") background_filter = os.path.join(item_asset_directory, "background.*" if file_name == "poster" else f"{file_name}_background.*") poster_matches = util.glob_filter(poster_filter) if len(poster_matches) > 0: poster = ImageData("asset_directory", os.path.abspath(poster_matches[0]), prefix=prefix, is_url=False) background_matches = util.glob_filter(background_filter) if len(background_matches) > 0: background = ImageData("asset_directory", os.path.abspath(background_matches[0]), prefix=prefix, is_poster=False, is_url=False) if is_top_level and self.asset_folders and self.dimensional_asset_rename and (not poster or not background): for file in util.glob_filter(os.path.join(item_asset_directory, "*.*")): if file.lower().endswith((".jpg", ".png", ".jpeg")): try: image = Image.open(file) _w, _h = image.size image.close() if not poster and _h >= _w: new_path = os.path.join(os.path.dirname(file), f"poster{os.path.splitext(file)[1].lower()}") os.rename(file, new_path) poster = ImageData("asset_directory", os.path.abspath(new_path), prefix=prefix, is_url=False) elif not background and _w > _h: new_path = os.path.join(os.path.dirname(file), f"background{os.path.splitext(file)[1].lower()}") os.rename(file, new_path) background = ImageData("asset_directory", os.path.abspath(new_path), prefix=prefix, is_poster=False, is_url=False) if poster and background: break except OSError: logger.error(f"Asset Error: Failed to open image: {file}") return poster, background, item_asset_directory, folder_name def get_ids(self, item): tmdb_id = None tvdb_id = None imdb_id = None if self.config.Cache: t_id, i_id, guid_media_type, _ = self.config.Cache.query_guid_map(item.guid) if t_id: if "movie" in guid_media_type: tmdb_id = t_id[0] else: tvdb_id = t_id[0] if i_id: imdb_id = i_id[0] if not tmdb_id and not tvdb_id: tmdb_id = self.get_tmdb_from_map(item) if not tmdb_id and not tvdb_id and self.is_show: tvdb_id = self.get_tvdb_from_map(item) return tmdb_id, tvdb_id, imdb_id def get_locked_attributes(self, item, titles=None): attrs = {} fields = {f.name: f for f in item.fields if f.locked} if isinstance(item, (Movie, Show)) and titles and titles.count(item.title) > 1: map_key = f"{item.title} ({item.year})" attrs["title"] = item.title attrs["year"] = item.year elif isinstance(item, (Season, Episode, Track)) and item.index: map_key = int(item.index) else: map_key = item.title if "title" in fields: if isinstance(item, (Movie, Show)): tmdb_id, tvdb_id, imdb_id = self.get_ids(item) tmdb_item = self.config.TMDb.get_item(item, tmdb_id, tvdb_id, imdb_id, is_movie=isinstance(item, Movie)) if tmdb_item: attrs["alt_title"] = tmdb_item.title elif isinstance(item, (Season, Episode, Track)): attrs["title"] = item.title def check_field(plex_key, pmm_key, var_key=None): if plex_key in fields and pmm_key not in self.metadata_backup["exclude"]: if not var_key: var_key = plex_key if hasattr(item, var_key): plex_value = getattr(item, var_key) if isinstance(plex_value, list): plex_tags = [t.tag for t in plex_value] if len(plex_tags) > 0 or self.metadata_backup["sync_tags"]: attrs[f"{pmm_key}.sync" if self.metadata_backup["sync_tags"] else pmm_key] = None if not plex_tags else plex_tags[0] if len(plex_tags) == 1 else plex_tags elif isinstance(plex_value, datetime): attrs[pmm_key] = datetime.strftime(plex_value, "%Y-%m-%d") else: attrs[pmm_key] = plex_value check_field("titleSort", "sort_title") check_field("originalTitle", "original_artist" if self.is_music else "original_title") check_field("originallyAvailableAt", "originally_available") check_field("contentRating", "content_rating") check_field("userRating", "user_rating") check_field("audienceRating", "audience_rating") check_field("rating", "critic_rating") check_field("studio", "record_label" if self.is_music else "studio") check_field("tagline", "tagline") check_field("summary", "summary") check_field("index", "track") check_field("parentIndex", "disc") check_field("director", "director", var_key="directors") check_field("country", "country", var_key="countries") check_field("genre", "genre", var_key="genres") check_field("writer", "writer", var_key="writers") check_field("producer", "producer", var_key="producers") check_field("collection", "collection", var_key="collections") check_field("label", "label", var_key="labels") check_field("mood", "mood", var_key="moods") check_field("style", "style", var_key="styles") check_field("similar", "similar_artist") if self.type in util.advance_tags_to_edit: for advance_edit in util.advance_tags_to_edit[self.type]: key, options = item_advance_keys[f"item_{advance_edit}"] if advance_edit in self.metadata_backup["exclude"] or not hasattr(item, key): continue keys = {v: k for k, v in options.items()} if keys[getattr(item, key)] not in ["default", "all", "never"]: attrs[advance_edit] = keys[getattr(item, key)] def _recur(sub): sub_items = {} for sub_item in getattr(item, sub)(): sub_item_key, sub_item_attrs = self.get_locked_attributes(sub_item) if sub_item_attrs: sub_items[sub_item_key] = sub_item_attrs if sub_items: attrs[sub] = sub_items if isinstance(item, Show): _recur("seasons") elif isinstance(item, Season): _recur("episodes") elif isinstance(item, Artist): _recur("albums") elif isinstance(item, Album): _recur("tracks") return map_key, attrs def split(self, text): attribute, modifier = os.path.splitext(str(text).lower()) attribute = method_alias[attribute] if attribute in method_alias else attribute modifier = modifier_alias[modifier] if modifier in modifier_alias else modifier if attribute == "add_to_arr": attribute = "radarr_add_missing" if self.is_movie else "sonarr_add_missing" elif attribute in ["arr_tag", "arr_folder"]: attribute = f"{'rad' if self.is_movie else 'son'}{attribute}" elif attribute in builder.date_attributes and modifier in [".gt", ".gte"]: modifier = ".after" elif attribute in builder.date_attributes and modifier in [".lt", ".lte"]: modifier = ".before" final = f"{attribute}{modifier}" if text != final: logger.warning(f"Collection Warning: {text} attribute will run as {final}") return attribute, modifier, final def check_filters(self, item, filters_in, current_time): for filter_method, filter_data in filters_in: filter_attr, modifier, filter_final = self.split(filter_method) if self.check_filter(item, filter_attr, modifier, filter_final, filter_data, current_time) is False: return False return True def check_filter(self, item, filter_attr, modifier, filter_final, filter_data, current_time): filter_actual = attribute_translation[filter_attr] if filter_attr in attribute_translation else filter_attr if isinstance(item, Movie): item_type = "movie" elif isinstance(item, Show): item_type = "show" elif isinstance(item, Season): item_type = "season" elif isinstance(item, Episode): item_type = "episode" elif isinstance(item, Artist): item_type = "artist" elif isinstance(item, Album): item_type = "album" elif isinstance(item, Track): item_type = "track" else: return True item = self.reload(item) if filter_attr not in builder.filters[item_type]: return True elif filter_attr in builder.date_filters: if util.is_date_filter(getattr(item, filter_actual), modifier, filter_data, filter_final, current_time): return False elif filter_attr in builder.string_filters: values = [] if filter_attr == "audio_track_title": for media in item.media: for part in media.parts: values.extend( [a.extendedDisplayTitle for a in part.audioStreams() if a.extendedDisplayTitle]) elif filter_attr == "filepath": values = [loc for loc in item.locations] else: values = [getattr(item, filter_actual)] if util.is_string_filter(values, modifier, filter_data): return False elif filter_attr in builder.boolean_filters: filter_check = False if filter_attr == "has_collection": filter_check = len(item.collections) > 0 elif filter_attr == "has_overlay": for label in item.labels: if label.tag.lower().endswith(" overlay") or label.tag.lower() == "overlay": filter_check = True break elif filter_attr == "has_dolby_vision": for media in item.media: for part in media.parts: for stream in part.videoStreams(): if stream.DOVIPresent: filter_check = True break if util.is_boolean_filter(filter_data, filter_check): return False elif filter_attr == "history": item_date = item.originallyAvailableAt if item_date is None: return False elif filter_data == "day": if item_date.month != current_time.month or item_date.day != current_time.day: return False elif filter_data == "month": if item_date.month != current_time.month: return False else: date_match = False for i in range(filter_data): check_date = current_time - timedelta(days=i) if item_date.month == check_date.month and item_date.day == check_date.day: date_match = True if date_match is False: return False elif filter_attr in ["seasons", "episodes", "albums", "tracks"]: if filter_attr == "seasons": sub_items = item.seasons() elif filter_attr == "albums": sub_items = item.albums() elif filter_attr == "tracks": sub_items = item.tracks() else: sub_items = item.episodes() filters_in = [] percentage = 60 for sub_atr, sub_data in filter_data.items(): if sub_atr == "percentage": percentage = sub_data else: filters_in.append((sub_atr, sub_data)) failure_threshold = len(sub_items) * ((100 - percentage) / 100) failures = 0 for sub_item in sub_items: if self.check_filters(sub_item, filters_in, current_time) is False: failures += 1 if failures > failure_threshold: return False elif modifier in [".gt", ".gte", ".lt", ".lte", ".count_gt", ".count_gte", ".count_lt", ".count_lte"]: divider = 60000 if filter_attr == "duration" else 1 test_number = [] if filter_attr == "resolution": for media in item.media: test_number.append(media.videoResolution) elif filter_attr == "audio_language": for media in item.media: for part in media.parts: test_number.extend([a.language for a in part.audioStreams()]) elif filter_attr == "subtitle_language": for media in item.media: for part in media.parts: test_number.extend([s.language for s in part.subtitleStreams()]) else: test_number = getattr(item, filter_actual) if modifier in [".count_gt", ".count_gte", ".count_lt", ".count_lte"]: test_number = len(test_number) if test_number else 0 modifier = f".{modifier[7:]}" if test_number is None or util.is_number_filter(test_number / divider, modifier, filter_data): return False else: attrs = [] if filter_attr in ["resolution", "audio_language", "subtitle_language"]: for media in item.media: if filter_attr == "resolution": attrs.append(media.videoResolution) for part in media.parts: if filter_attr == "audio_language": attrs.extend([a.language for a in part.audioStreams()]) if filter_attr == "subtitle_language": attrs.extend([s.language for s in part.subtitleStreams()]) elif filter_attr in ["content_rating", "year", "rating"]: attrs = [getattr(item, filter_actual)] elif filter_attr in ["actor", "country", "director", "genre", "label", "producer", "writer", "collection", "network"]: attrs = [attr.tag for attr in getattr(item, filter_actual)] else: raise Failed(f"Filter Error: filter: {filter_final} not supported") if modifier == ".regex": has_match = False for reg in filter_data: for name in attrs: if re.compile(reg).search(name): has_match = True if has_match is False: return False elif (not list(set(filter_data) & set(attrs)) and modifier == "") \ or (list(set(filter_data) & set(attrs)) and modifier == ".not"): return False return True