You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/plex.py

1704 lines
86 KiB

import os, plexapi, re, requests
from datetime import datetime, timedelta
from modules import builder, util
3 years ago
from modules.library import Library
from modules.util import Failed, ImageData
from PIL import Image
from plexapi import utils
from plexapi.audio import Artist, Track, Album
4 years ago
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
from plexapi.collection import Collection
from plexapi.library import Role, FilterChoice
from plexapi.playlist import Playlist
4 years ago
from plexapi.server import PlexServer
from plexapi.video import Movie, Show, Season, Episode
4 years ago
from retrying import retry
from urllib import parse
from xml.etree.ElementTree import ParseError
4 years ago
logger = util.logger
4 years ago
builders = ["plex_all", "plex_watchlist", "plex_pilots", "plex_collectionless", "plex_search"]
library_types = ["movie", "show", "artist"]
search_translation = {
"episode_title": "episode.title",
"network": "show.network",
"edition": "editionTitle",
"critic_rating": "rating",
"audience_rating": "audienceRating",
"episode_critic_rating": "episode.rating",
"episode_audience_rating": "episode.audienceRating",
"user_rating": "userRating",
"episode_user_rating": "episode.userRating",
"content_rating": "contentRating",
"episode_year": "episode.year",
"release": "originallyAvailableAt",
"show_unmatched": "show.unmatched",
"episode_unmatched": "episode.unmatched",
"episode_duplicate": "episode.duplicate",
"added": "addedAt",
"episode_added": "episode.addedAt",
"episode_air_date": "episode.originallyAvailableAt",
"plays": "viewCount",
"episode_plays": "episode.viewCount",
"last_played": "lastViewedAt",
"episode_last_played": "episode.lastViewedAt",
"unplayed": "unwatched",
"episode_unplayed": "episode.unwatched",
"subtitle_language": "subtitleLanguage",
"audio_language": "audioLanguage",
"progress": "inProgress",
"episode_progress": "episode.inProgress",
"unplayed_episodes": "show.unwatchedLeaves",
"season_collection": "season.collection",
"episode_collection": "episode.collection",
"season_label": "season.label",
"episode_label": "episode.label",
"artist_title": "artist.title",
"artist_user_rating": "artist.userRating",
"artist_genre": "artist.genre",
"artist_collection": "artist.collection",
"artist_country": "artist.country",
"artist_mood": "artist.mood",
"artist_style": "artist.style",
"artist_added": "artist.addedAt",
"artist_last_played": "artist.lastViewedAt",
"artist_unmatched": "artist.unmatched",
"artist_label": "artist.label",
"album_title": "album.title",
"album_year": "album.year",
"album_decade": "album.decade",
"album_genre": "album.genre",
"album_plays": "album.viewCount",
"album_last_played": "album.lastViewedAt",
"album_user_rating": "album.userRating",
"album_critic_rating": "album.rating",
"album_record_label": "album.studio",
"album_mood": "album.mood",
"album_style": "album.style",
"album_format": "album.format",
"album_type": "album.subformat",
"album_collection": "album.collection",
"album_added": "album.addedAt",
"album_released": "album.originallyAvailableAt",
"album_unmatched": "album.unmatched",
"album_source": "album.source",
"album_label": "album.label",
"track_mood": "track.mood",
"track_title": "track.title",
"track_plays": "track.viewCount",
"track_last_played": "track.lastViewedAt",
"track_skips": "track.skipCount",
"track_last_skipped": "track.lastSkippedAt",
"track_user_rating": "track.userRating",
"track_last_rated": "track.lastRatedAt",
"track_added": "track.addedAt",
"track_trash": "track.trash",
"track_source": "track.source",
"track_label": "track.label"
}
3 years ago
show_translation = {
"title": "show.title",
"studio": "show.studio",
"rating": "show.rating",
"audienceRating": "show.audienceRating",
"userRating": "show.userRating",
"contentRating": "show.contentRating",
"year": "show.year",
"originallyAvailableAt": "show.originallyAvailableAt",
"unmatched": "show.unmatched",
"genre": "show.genre",
"collection": "show.collection",
"actor": "show.actor",
"addedAt": "show.addedAt",
"viewCount": "show.viewCount",
"lastViewedAt": "show.lastViewedAt",
"resolution": "episode.resolution",
3 years ago
"hdr": "episode.hdr",
"subtitleLanguage": "episode.subtitleLanguage",
"audioLanguage": "episode.audioLanguage",
"trash": "episode.trash",
"label": "show.label",
3 years ago
}
modifier_translation = {
"": "", ".not": "!", ".is": "%3D", ".isnot": "!%3D", ".gt": "%3E%3E", ".gte": "%3E", ".lt": "%3C%3C", ".lte": "%3C",
2 years ago
".before": "%3C%3C", ".after": "%3E%3E", ".begins": "%3C", ".ends": "%3E", ".regex": "", ".rated": ""
}
attribute_translation = {
"aspect": "aspectRatio",
"channels": "audioChannels",
"audio_codec": "audioCodec",
"audio_profile ": "audioProfile",
"video_codec": "videoCodec",
"video_profile": "videoProfile",
"resolution": "videoResolution",
"record_label": "studio",
2 years ago
"similar_artist": "similar",
"actor": "actors",
"audience_rating": "audienceRating",
"collection": "collections",
"content_rating": "contentRating",
"country": "countries",
"critic_rating": "rating",
"director": "directors",
"genre": "genres",
"label": "labels",
"producer": "producers",
"release": "originallyAvailableAt",
"originally_available": "originallyAvailableAt",
"added": "addedAt",
"last_played": "lastViewedAt",
"plays": "viewCount",
"user_rating": "userRating",
"writer": "writers",
"mood": "moods",
"style": "styles",
"episode_number": "episodeNumber",
"season_number": "seasonNumber",
"original_title": "originalTitle",
"edition": "editionTitle",
"runtime": "duration",
"season_title": "parentTitle",
"episode_count": "leafCount",
"versions": "media"
}
method_alias = {
"actors": "actor", "role": "actor", "roles": "actor",
"show_actor": "actor", "show_actors": "actor", "show_role": "actor", "show_roles": "actor",
"collections": "collection", "plex_collection": "collection",
"show_collections": "collection", "show_collection": "collection",
"content_ratings": "content_rating", "contentRating": "content_rating", "contentRatings": "content_rating",
"countries": "country",
"decades": "decade",
"directors": "director",
"genres": "genre",
"labels": "label",
"collection_minimum": "minimum_items",
"playlist_minimum": "minimum_items",
"save_missing": "save_report",
"rating": "critic_rating",
"show_user_rating": "user_rating",
"video_resolution": "resolution",
"tmdb_trending": "tmdb_trending_daily",
"play": "plays", "show_plays": "plays", "show_play": "plays", "episode_play": "episode_plays",
"originally_available": "release", "episode_originally_available": "episode_air_date",
"episode_release": "episode_air_date", "episode_released": "episode_air_date",
"show_originally_available": "release", "show_release": "release", "show_air_date": "release",
"released": "release", "show_released": "release", "max_age": "release",
"studios": "studio",
"networks": "network",
"producers": "producer",
"writers": "writer",
"years": "year", "show_year": "year", "show_years": "year",
"show_title": "title", "filter": "filters",
"seasonyear": "year", "isadult": "adult", "startdate": "start", "enddate": "end", "averagescore": "score",
"minimum_tag_percentage": "min_tag_percent", "minimumtagrank": "min_tag_percent", "minimum_tag_rank": "min_tag_percent",
"anilist_tag": "anilist_search", "anilist_genre": "anilist_search", "anilist_season": "anilist_search",
"mal_producer": "mal_studio", "mal_licensor": "mal_studio",
"trakt_recommended": "trakt_recommended_weekly", "trakt_watched": "trakt_watched_weekly", "trakt_collected": "trakt_collected_weekly",
"collection_changes_webhooks": "changes_webhooks",
"radarr_add": "radarr_add_missing", "sonarr_add": "sonarr_add_missing",
"trakt_recommended_personal": "trakt_recommendations",
"collection_level": "builder_level", "overlay_level": "builder_level",
}
modifier_alias = {".greater": ".gt", ".less": ".lt"}
date_sub_mods = {"s": "Seconds", "m": "Minutes", "h": "Hours", "d": "Days", "w": "Weeks", "o": "Months", "y": "Years"}
album_sorting_options = {"default": -1, "newest": 0, "oldest": 1, "name": 2}
episode_sorting_options = {"default": -1, "oldest": 0, "newest": 1}
keep_episodes_options = {"all": 0, "5_latest": 5, "3_latest": 3, "latest": 1, "past_3": -3, "past_7": -7, "past_30": -30}
delete_episodes_options = {"never": 0, "day": 1, "week": 7, "refresh": 100}
season_display_options = {"default": -1, "show": 0, "hide": 1}
4 years ago
episode_ordering_options = {"default": None, "tmdb_aired": "tmdbAiring", "tvdb_aired": "aired", "tvdb_dvd": "dvd", "tvdb_absolute": "absolute"}
plex_languages = ["default", "ar-SA", "ca-ES", "cs-CZ", "da-DK", "de-DE", "el-GR", "en-AU", "en-CA", "en-GB", "en-US",
"es-ES", "es-MX", "et-EE", "fa-IR", "fi-FI", "fr-CA", "fr-FR", "he-IL", "hi-IN", "hu-HU", "id-ID",
"it-IT", "ja-JP", "ko-KR", "lt-LT", "lv-LV", "nb-NO", "nl-NL", "pl-PL", "pt-BR", "pt-PT", "ro-RO",
"ru-RU", "sk-SK", "sv-SE", "th-TH", "tr-TR", "uk-UA", "vi-VN", "zh-CN", "zh-HK", "zh-TW"]
metadata_language_options = {lang.lower(): lang for lang in plex_languages}
metadata_language_options["default"] = None
4 years ago
use_original_title_options = {"default": -1, "no": 0, "yes": 1}
collection_order_options = ["release", "alpha", "custom"]
collection_filtering_options = ["user", "admin"]
collection_mode_options = {
"default": "default", "hide": "hide",
"hide_items": "hideItems", "hideitems": "hideItems",
"show_items": "showItems", "showitems": "showItems"
}
builder_level_show_options = ["episode", "season"]
builder_level_music_options = ["album", "track"]
builder_level_options = builder_level_show_options + builder_level_music_options
collection_mode_keys = {-1: "default", 0: "hide", 1: "hideItems", 2: "showItems"}
collection_order_keys = {0: "release", 1: "alpha", 2: "custom"}
item_advance_keys = {
"item_album_sorting": ("albumSort", album_sorting_options),
"item_episode_sorting": ("episodeSort", episode_sorting_options),
"item_keep_episodes": ("autoDeletionItemPolicyUnwatchedLibrary", keep_episodes_options),
"item_delete_episodes": ("autoDeletionItemPolicyWatchedLibrary", delete_episodes_options),
"item_season_display": ("flattenSeasons", season_display_options),
"item_episode_ordering": ("showOrdering", episode_ordering_options),
"item_metadata_language": ("languageOverride", metadata_language_options),
"item_use_original_title": ("useOriginalTitle", use_original_title_options)
}
new_plex_agents = ["tv.plex.agents.movie", "tv.plex.agents.series"]
and_searches = [
"title.and", "studio.and", "actor.and", "audio_language.and", "collection.and",
"content_rating.and", "country.and", "director.and", "genre.and", "label.and",
"network.and", "producer.and", "subtitle_language.and", "writer.and"
]
or_searches = [
"title", "studio", "actor", "audio_language", "collection", "content_rating",
"country", "director", "genre", "label", "network", "producer", "subtitle_language",
"writer", "decade", "resolution", "year", "episode_title", "episode_year"
]
movie_only_searches = [
3 years ago
"country", "country.not", "director", "director.not", "producer", "producer.not", "writer", "writer.not",
"decade", "duplicate", "unplayed", "progress",
"duration.gt", "duration.gte", "duration.lt", "duration.lte"
"edition", "edition.not", "edition.is", "edition.isnot", "edition.begins", "edition.ends"
]
show_only_searches = [
3 years ago
"network", "network.not",
"season_collection", "season_collection.not",
"episode_collection", "episode_collection.not",
"season_label", "season_label.not",
"episode_label", "episode_label.not",
"episode_title", "episode_title.not", "episode_title.is", "episode_title.isnot", "episode_title.begins", "episode_title.ends",
"episode_added", "episode_added.not", "episode_added.before", "episode_added.after",
"episode_air_date", "episode_air_date.not",
"episode_air_date.before", "episode_air_date.after",
"episode_last_played", "episode_last_played.not", "episode_last_played.before", "episode_last_played.after",
"episode_plays.gt", "episode_plays.gte", "episode_plays.lt", "episode_plays.lte",
2 years ago
"episode_user_rating.gt", "episode_user_rating.gte", "episode_user_rating.lt", "episode_user_rating.lte", "episode_user_rating.rated",
"episode_critic_rating.gt", "episode_critic_rating.gte", "episode_critic_rating.lt", "episode_critic_rating.lte", "episode_critic_rating.rated",
"episode_audience_rating.gt", "episode_audience_rating.gte", "episode_audience_rating.lt", "episode_audience_rating.lte", "episode_audience_rating.rated",
"episode_year", "episode_year.not", "episode_year.gt", "episode_year.gte", "episode_year.lt", "episode_year.lte",
"unplayed_episodes", "episode_unplayed", "episode_duplicate", "episode_progress", "episode_unmatched", "show_unmatched",
]
string_attributes = ["title", "studio", "edition", "episode_title", "artist_title", "album_title", "album_record_label", "track_title"]
string_modifiers = ["", ".not", ".is", ".isnot", ".begins", ".ends"]
3 years ago
boolean_attributes = [
"hdr", "unmatched", "duplicate", "unplayed", "progress", "trash", "unplayed_episodes", "episode_unplayed",
"episode_duplicate", "episode_progress", "episode_unmatched", "show_unmatched", "artist_unmatched", "album_unmatched", "track_trash"
]
3 years ago
tmdb_attributes = ["actor", "director", "producer", "writer"]
date_attributes = [
"added", "episode_added", "release", "episode_air_date", "last_played", "episode_last_played",
"artist_added", "artist_last_played", "album_last_played",
"album_added", "album_released", "track_last_played", "track_last_skipped", "track_last_rated", "track_added"
]
date_modifiers = ["", ".not", ".before", ".after"]
year_attributes = ["decade", "year", "episode_year", "album_year", "album_decade"]
number_attributes = ["plays", "episode_plays", "album_plays", "track_plays", "track_skips"] + year_attributes
2 years ago
number_modifiers = [".gt", ".gte", ".lt", ".lte"]
3 years ago
float_attributes = [
"user_rating", "episode_user_rating", "critic_rating", "episode_critic_rating", "audience_rating", "episode_audience_rating",
"duration", "artist_user_rating", "album_user_rating", "album_critic_rating", "track_user_rating"
3 years ago
]
2 years ago
float_modifiers = number_modifiers + [".rated"]
3 years ago
search_display = {"added": "Date Added", "release": "Release Date", "hdr": "HDR", "progress": "In Progress", "episode_progress": "Episode In Progress"}
tag_attributes = [
"actor", "audio_language", "collection", "content_rating", "country", "director", "genre", "label", "season_label", "episode_label", "network",
"producer", "resolution", "studio", "subtitle_language", "writer", "season_collection", "episode_collection", "edition",
"artist_genre", "artist_collection", "artist_country", "artist_mood", "artist_label", "artist_style", "album_genre", "album_mood",
"album_style", "album_format", "album_type", "album_collection", "album_source", "album_label", "track_mood", "track_source", "track_label"
]
tag_modifiers = ["", ".not", ".regex"]
3 years ago
no_not_mods = ["resolution", "decade", "album_decade"]
searches = boolean_attributes + \
[f"{f}{m}" for f in string_attributes for m in string_modifiers] + \
3 years ago
[f"{f}{m}" for f in tag_attributes + year_attributes for m in tag_modifiers if f not in no_not_mods or m != ".not"] + \
[f"{f}{m}" for f in date_attributes for m in date_modifiers] + \
2 years ago
[f"{f}{m}" for f in number_attributes for m in number_modifiers if f not in no_not_mods] + \
[f"{f}{m}" for f in float_attributes for m in float_modifiers if f != "duration" or m != ".rated"]
music_searches = [a for a in searches if a.startswith(("artist", "album", "track"))]
movie_sorts = {
"title.asc": "titleSort", "title.desc": "titleSort%3Adesc",
"year.asc": "year", "year.desc": "year%3Adesc",
"originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc",
"release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc",
"critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc",
"audience_rating.asc": "audienceRating", "audience_rating.desc": "audienceRating%3Adesc",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"content_rating.asc": "contentRating", "content_rating.desc": "contentRating%3Adesc",
"duration.asc": "duration", "duration.desc": "duration%3Adesc",
"progress.asc": "viewOffset", "progress.desc": "viewOffset%3Adesc",
"plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"viewed.asc": "lastViewedAt", "viewed.desc": "lastViewedAt%3Adesc",
"resolution.asc": "mediaHeight", "resolution.desc": "mediaHeight%3Adesc",
"bitrate.asc": "mediaBitrate", "bitrate.desc": "mediaBitrate%3Adesc",
"random": "random"
}
show_sorts = {
"title.asc": "titleSort", "title.desc": "titleSort%3Adesc",
"year.asc": "year", "year.desc": "year%3Adesc",
"originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc",
"episode_originally_available.asc": "episode.originallyAvailableAt", "episode_originally_available.desc": "episode.originallyAvailableAt%3Adesc",
"release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc",
"episode_release.asc": "episode.originallyAvailableAt", "episode_release.desc": "episode.originallyAvailableAt%3Adesc",
"critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc",
"audience_rating.asc": "audienceRating", "audience_rating.desc": "audienceRating%3Adesc",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"content_rating.asc": "contentRating", "content_rating.desc": "contentRating%3Adesc",
"unplayed.asc": "unviewedLeafCount", "unplayed.desc": "unviewedLeafCount%3Adesc",
"episode_added.asc": "episode.addedAt", "episode_added.desc": "episode.addedAt%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"viewed.asc": "lastViewedAt", "viewed.desc": "lastViewedAt%3Adesc",
"random": "random"
}
season_sorts = {
"season.asc": "season.index%2Cseason.titleSort", "season.desc": "season.index%3Adesc%2Cseason.titleSort",
"show.asc": "show.titleSort%2Cindex", "show.desc": "show.titleSort%3Adesc%2Cindex",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"random": "random"
}
episode_sorts = {
"title.asc": "titleSort", "title.desc": "titleSort%3Adesc",
"show.asc": "show.titleSort%2Cseason.index%3AnullsLast%2Cepisode.index%3AnullsLast%2Cepisode.originallyAvailableAt%3AnullsLast%2Cepisode.titleSort%2Cepisode.id",
"show.desc": "show.titleSort%3Adesc%2Cseason.index%3AnullsLast%2Cepisode.index%3AnullsLast%2Cepisode.originallyAvailableAt%3AnullsLast%2Cepisode.titleSort%2Cepisode.id",
"year.asc": "year", "year.desc": "year%3Adesc",
"originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc",
"episode_originally_available.asc": "episode.originallyAvailableAt", "episode_originally_available.desc": "episode.originallyAvailableAt%3Adesc",
"release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc",
"episode_release.asc": "episode.originallyAvailableAt", "episode_release.desc": "episode.originallyAvailableAt%3Adesc",
"critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc",
"audience_rating.asc": "audienceRating", "audience_rating.desc": "audienceRating%3Adesc",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"duration.asc": "duration", "duration.desc": "duration%3Adesc",
"progress.asc": "viewOffset", "progress.desc": "viewOffset%3Adesc",
"plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"viewed.asc": "lastViewedAt", "viewed.desc": "lastViewedAt%3Adesc",
"resolution.asc": "mediaHeight", "resolution.desc": "mediaHeight%3Adesc",
"bitrate.asc": "mediaBitrate", "bitrate.desc": "mediaBitrate%3Adesc",
"random": "random"
}
artist_sorts = {
"title.asc": "titleSort", "title.desc": "titleSort%3Adesc",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"played.asc": "lastViewedAt", "played.desc": "lastViewedAt%3Adesc",
"plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc",
"random": "random"
}
album_sorts = {
"title.asc": "titleSort", "title.desc": "titleSort%3Adesc",
"album_artist.asc": "artist.titleSort%2Calbum.titleSort%2Calbum.index%2Calbum.id%2Calbum.originallyAvailableAt",
"album_artist.desc": "artist.titleSort%3Adesc%2Calbum.titleSort%2Calbum.index%2Calbum.id%2Calbum.originallyAvailableAt",
3 years ago
"year.asc": "year", "year.desc": "year%3Adesc",
"originally_available.asc": "originallyAvailableAt", "originally_available.desc": "originallyAvailableAt%3Adesc",
"release.asc": "originallyAvailableAt", "release.desc": "originallyAvailableAt%3Adesc",
"critic_rating.asc": "rating", "critic_rating.desc": "rating%3Adesc",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"played.asc": "lastViewedAt", "played.desc": "lastViewedAt%3Adesc",
"plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc",
"random": "random"
}
track_sorts = {
"title.asc": "titleSort", "title.desc": "titleSort%3Adesc",
"album_artist.asc": "artist.titleSort%2Calbum.titleSort%2Calbum.year%2Ctrack.absoluteIndex%2Ctrack.index%2Ctrack.titleSort%2Ctrack.id",
"album_artist.desc": "artist.titleSort%3Adesc%2Calbum.titleSort%2Calbum.year%2Ctrack.absoluteIndex%2Ctrack.index%2Ctrack.titleSort%2Ctrack.id",
"artist.asc": "originalTitle", "artist.desc": "originalTitle%3Adesc",
"album.asc": "album.titleSort", "album.desc": "album.titleSort%3Adesc",
"user_rating.asc": "userRating", "user_rating.desc": "userRating%3Adesc",
"duration.asc": "duration", "duration.desc": "duration%3Adesc",
"plays.asc": "viewCount", "plays.desc": "viewCount%3Adesc",
"added.asc": "addedAt", "added.desc": "addedAt%3Adesc",
"played.asc": "lastViewedAt", "played.desc": "lastViewedAt%3Adesc",
"rated.asc": "lastRatedAt", "rated.desc": "lastRatedAt%3Adesc",
"popularity.asc": "ratingCount", "popularity.desc": "ratingCount%3Adesc",
"bitrate.asc": "mediaBitrate", "bitrate.desc": "mediaBitrate%3Adesc",
"random": "random"
}
sort_types = {
"movie": ("title.asc", 1, movie_sorts),
"show": ("title.asc", 2, show_sorts),
"season": ("season.asc", 3, season_sorts),
"episode": ("title.asc", 4, episode_sorts),
"artist": ("title.asc", 8, artist_sorts),
"album": ("title.asc", 9, album_sorts),
"track": ("title.asc", 10, track_sorts)
}
watchlist_sorts = {
"added.asc": "watchlistedAt:asc", "added.desc": "watchlistedAt:desc",
"title.asc": "titleSort:asc", "title.desc": "titleSort:desc",
"release.asc": "originallyAvailableAt:asc", "release.desc": "originallyAvailableAt:desc",
"critic_rating.asc": "rating:asc", "critic_rating.desc": "rating:desc",
}
3 years ago
class Plex(Library):
def __init__(self, config, params):
3 years ago
super().__init__(config, params)
self.plex = params["plex"]
self.url = params["plex"]["url"]
self.token = params["plex"]["token"]
self.timeout = params["plex"]["timeout"]
logger.secret(self.url)
logger.secret(self.token)
try:
self.PlexServer = PlexServer(baseurl=self.url, token=self.token, session=self.config.session, timeout=self.timeout)
2 years ago
plexapi.server.TIMEOUT = self.timeout
os.environ["PLEXAPI_PLEXAPI_TIMEOUT"] = str(self.timeout)
except Unauthorized:
raise Failed("Plex Error: Plex token is invalid")
except ValueError as e:
raise Failed(f"Plex Error: {e}")
except (requests.exceptions.ConnectionError, ParseError):
logger.stacktrace()
4 years ago
raise Failed("Plex Error: Plex url is invalid")
3 years ago
self.Plex = None
library_names = []
for s in self.PlexServer.library.sections():
library_names.append(s.title)
if s.title == params["name"]:
self.Plex = s
break
if not self.Plex:
raise Failed(f"Plex Error: Plex Library '{params['name']}' not found. Options: {library_names}")
if self.Plex.type not in library_types:
raise Failed(f"Plex Error: Plex Library must be a Movies, TV Shows, or Music library")
if not self.Plex.allowSync:
raise Failed("Plex Error: Plex Token is read only. Please get a new token")
4 years ago
self.type = self.Plex.type.capitalize()
self.plex_pass = self.PlexServer.myPlexSubscription
self._users = []
3 years ago
self._all_items = []
self._account = None
self.agent = self.Plex.agent
self.is_movie = self.type == "Movie"
self.is_show = self.type == "Show"
self.is_music = self.type == "Artist"
self.is_other = self.agent == "com.plexapp.agents.none"
if self.is_other and self.type == "Movie":
self.type = "Video"
if not self.is_music and self.update_blank_track_titles:
self.update_blank_track_titles = False
logger.error(f"update_blank_track_titles library operation only works with music libraries")
def notify(self, text, collection=None, critical=True):
self.config.notify(text, server=self.PlexServer.friendlyName, library=self.name, collection=collection, critical=critical)
def notify_delete(self, message):
self.config.notify_delete(message, server=self.PlexServer.friendlyName, library=self.name)
def set_server_preroll(self, preroll):
self.PlexServer.settings.get('cinemaTrailersPrerollID').set(preroll)
self.PlexServer.settings.save()
def get_all_collections(self, label=None):
args = "?type=18"
if label:
label_id = next((c.key for c in self.get_tags("label") if c.title == label), None) # noqa
if label_id:
args = f"{args}&label={label_id}"
else:
return []
return self.get_filter_items(args)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def search(self, title=None, sort=None, maxresults=None, libtype=None, **kwargs):
return self.Plex.search(title=title, sort=sort, maxresults=maxresults, libtype=libtype, **kwargs)
4 years ago
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def exact_search(self, title, libtype=None, year=None):
terms = {"title=": title}
if year:
terms["year"] = year
return self.Plex.search(libtype=libtype, **terms)
def fetch_item(self, item):
if isinstance(item, (Movie, Show, Season, Episode, Artist, Album, Track)):
return self.reload(item)
key = int(item)
if key in self.cached_items:
return self.reload(self.cached_items[key][0])
try:
current = self.fetchItem(key)
if isinstance(current, (Movie, Show, Season, Episode, Artist, Album, Track)):
return self.reload(current)
except (BadRequest, NotFound) as e:
logger.trace(e)
raise Failed(f"Plex Error: Item {item} not found")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
4 years ago
def fetchItem(self, data):
return self.PlexServer.fetchItem(data)
def get_all(self, builder_level=None, load=False):
if load and builder_level in [None, "show", "artist", "movie"]:
3 years ago
self._all_items = []
if self._all_items and builder_level in [None, "show", "artist", "movie"]:
3 years ago
return self._all_items
builder_type = builder_level if builder_level else self.Plex.TYPE
if not builder_level:
builder_level = self.type
logger.info(f"Loading All {builder_level.capitalize()}s from Library: {self.name}")
key = f"/library/sections/{self.Plex.key}/all?includeGuids=1&type={utils.searchType(builder_type)}"
container_start = 0
container_size = plexapi.X_PLEX_CONTAINER_SIZE
results = []
while self.Plex._totalViewSize is None or container_start <= self.Plex._totalViewSize:
results.extend(self.fetchItems(key, container_start, container_size))
logger.ghost(f"Loaded: {container_start}/{self.Plex._totalViewSize}")
container_start += container_size
logger.info(f"Loaded {self.Plex._totalViewSize} {builder_level.capitalize()}s")
if builder_level in [None, "show", "artist", "movie"]:
self._all_items = results
return results
def upload_theme(self, collection, url=None, filepath=None):
key = f"/library/metadata/{collection.ratingKey}/themes"
if url:
self.PlexServer.query(f"{key}?url={parse.quote_plus(url)}", method=self.PlexServer._session.post)
elif filepath:
self.PlexServer.query(key, method=self.PlexServer._session.post, data=open(filepath, 'rb').read())
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def create_playlist(self, name, items):
return self.PlexServer.createPlaylist(name, items=items)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def fetchItems(self, key, container_start, container_size):
return self.Plex.fetchItems(key, container_start=container_start, container_size=container_size)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def moveItem(self, obj, item, after):
try:
obj.moveItem(item, after=after)
except (BadRequest, NotFound, Unauthorized) as e:
logger.error(e)
raise Failed("Move Failed")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def query(self, method):
return method()
def delete(self, obj):
try:
return self.query(obj.delete)
except Exception:
logger.stacktrace()
raise Failed(f"Plex Error: Failed to delete {obj.title}")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def query_data(self, method, data):
return method(data)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def tag_edit(self, item, attribute, data, locked=True, remove=False):
return item.editTags(attribute, data, locked=locked, remove=remove)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def query_collection(self, item, collection, locked=True, add=True):
if add:
item.addCollection(collection, locked=locked)
else:
item.removeCollection(collection, locked=locked)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def collection_mode_query(self, collection, data):
collection.modeUpdate(mode=data)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def collection_order_query(self, collection, data):
collection.sortUpdate(sort=data)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def item_labels(self, item):
try:
return item.labels
except BadRequest:
raise Failed(f"Item: {item.title} Labels failed to load")
def find_poster_url(self, item):
if isinstance(item, Movie):
if item.ratingKey in self.movie_rating_key_map:
return self.config.TMDb.get_movie(self.movie_rating_key_map[item.ratingKey]).poster_url
elif isinstance(item, (Show, Season, Episode)):
check_key = item.ratingKey if isinstance(item, Show) else item.show().ratingKey
if check_key in self.show_rating_key_map:
tmdb_id = self.config.Convert.tvdb_to_tmdb(self.show_rating_key_map[check_key])
if isinstance(item, Show) and item.ratingKey in self.show_rating_key_map:
return self.config.TMDb.get_show(tmdb_id).poster_url
elif isinstance(item, Season):
return self.config.TMDb.get_season(tmdb_id, item.seasonNumber).poster_url
elif isinstance(item, Episode):
return self.config.TMDb.get_episode(tmdb_id, item.seasonNumber, item.episodeNumber).still_url
def item_posters(self, item, providers=None):
if providers is None:
providers = ["plex", "tmdb"]
image_url = None
for provider in providers:
if provider == "plex":
for poster in item.posters():
if poster.key.startswith("/"):
image_url = f"{self.url}{poster.key}&X-Plex-Token={self.token}"
if poster.ratingKey.startswith("upload"):
try:
self.check_image_for_overlay(image_url, os.path.join(self.overlay_backup, "temp"), remove=True)
except Failed as e:
logger.trace(f"Plex Error: {e}")
continue
2 years ago
else:
image_url = poster.key
break
if provider == "tmdb":
try:
image_url = self.find_poster_url(item)
except Failed as e:
logger.trace(e)
continue
if image_url:
break
if not image_url and "plex" in providers and isinstance(item, Season):
for poster in item.show().posters():
if poster.key.startswith("/"):
image_url = f"{self.url}{poster.key}&X-Plex-Token={self.token}"
if poster.ratingKey.startswith("upload"):
try:
self.check_image_for_overlay(image_url, os.path.join(self.overlay_backup, "temp"), remove=True)
except Failed as e:
logger.trace(f"Plex Error: {e}")
continue
else:
image_url = poster.key
break
if not image_url:
raise Failed("Overlay Error: No Poster found to reset")
return image_url
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def reload(self, item, force=False):
is_full = False
if not force and item.ratingKey in self.cached_items:
item, is_full = self.cached_items[item.ratingKey]
try:
if not is_full or force:
item.reload(checkFiles=False, includeAllConcerts=False, includeBandwidths=False, includeChapters=False,
includeChildren=False, includeConcerts=False, includeExternalMedia=False, includeExtras=False,
includeFields=False, includeGeolocation=False, includeLoudnessRamps=False, includeMarkers=False,
includeOnDeck=False, includePopularLeaves=False, includeRelated=False, includeRelatedCount=0,
includeReviews=False, includeStations=False)
item._autoReload = False
self.cached_items[item.ratingKey] = (item, True)
except (BadRequest, NotFound) as e:
logger.stacktrace()
raise Failed(f"Item Failed to Load: {e}")
return item
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
4 years ago
def edit_query(self, item, edits, advanced=False):
if advanced:
item.editAdvanced(**edits)
else:
item.edit(**edits)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def _upload_image(self, item, image):
3 years ago
try:
if image.is_poster and image.is_url:
item.uploadPoster(url=image.location)
elif image.is_poster:
item.uploadPoster(filepath=image.location)
elif image.is_url:
item.uploadArt(url=image.location)
else:
item.uploadArt(filepath=image.location)
self.reload(item, force=True)
3 years ago
except BadRequest as e:
3 years ago
item.refresh()
3 years ago
raise Failed(e)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def upload_poster(self, item, image, url=False):
if url:
item.uploadPoster(url=image)
else:
item.uploadPoster(filepath=image)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def upload_background(self, item, image, url=False):
if url:
item.uploadArt(url=image)
else:
item.uploadArt(filepath=image)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_actor_id(self, name):
results = self.Plex.hubSearch(name)
for result in results:
if isinstance(result, Role) and result.librarySectionID == self.Plex.key and result.tag == name:
return result.id
def get_search_choices(self, search_name, title=True, name_pairs=False):
final_search = search_translation[search_name] if search_name in search_translation else search_name
3 years ago
final_search = show_translation[final_search] if self.is_show and final_search in show_translation else final_search
try:
3 years ago
names = []
4 years ago
choices = {}
use_title = title and final_search not in ["contentRating", "audioLanguage", "subtitleLanguage", "resolution"]
for choice in self.get_tags(final_search):
if choice.title not in names:
names.append((choice.title, choice.key) if name_pairs else choice.title)
choices[choice.title] = choice.title if use_title else choice.key
choices[choice.key] = choice.title if use_title else choice.key
choices[choice.title.lower()] = choice.title if use_title else choice.key
choices[choice.key.lower()] = choice.title if use_title else choice.key
3 years ago
return choices, names
except NotFound:
3 years ago
logger.debug(f"Search Attribute: {final_search}")
3 years ago
raise Failed(f"Plex Error: plex_search attribute: {search_name} not supported")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def get_tags(self, tag):
if isinstance(tag, str):
match = re.match(r'(?:([a-zA-Z]*)\.)?([a-zA-Z]+)', tag)
if not match:
raise BadRequest(f'Invalid filter field: {tag}')
_libtype, tag = match.groups()
libtype = _libtype or self.Plex.TYPE
try:
tag = next(f for f in self.Plex.listFilters(libtype) if f.filter == tag)
except StopIteration:
availableFilters = [f.filter for f in self.Plex.listFilters(libtype)]
raise NotFound(f'Unknown filter field "{tag}" for libtype "{libtype}". '
f'Available filters: {availableFilters}') from None
items = self.Plex.findItems(self.Plex._server.query(tag.key), FilterChoice)
if tag.key.endswith("/collection?type=4"):
keys = [k.key for k in items]
keys.extend([k.key for k in self.Plex.findItems(self.Plex._server.query(f"{tag.key[:-1]}3"), FilterChoice)])
items = [i for i in self.Plex.findItems(self.Plex._server.query(tag.key[:-7]), FilterChoice) if i.key not in keys]
return items
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def _query(self, key, post=False, put=False):
if post: method = self.Plex._server._session.post
elif put: method = self.Plex._server._session.put
else: method = None
return self.Plex._server.query(key, method=method)
@property
def users(self):
if not self._users:
users = []
for user in self.account.users():
if self.PlexServer.machineIdentifier in [s.machineIdentifier for s in user.servers]:
users.append(user.title)
self._users = users
return self._users
def delete_user_playlist(self, title, user):
try:
self.delete(self.PlexServer.switchUser(user).playlist(title))
except NotFound as e:
raise Failed(e)
@property
def account(self):
if self._account is None:
self._account = self.PlexServer.myPlexAccount()
return self._account
def playlist_report(self):
playlists = {}
def scan_user(server, username):
2 years ago
try:
for playlist in server.playlists():
if isinstance(playlist, Playlist):
if playlist.title not in playlists:
playlists[playlist.title] = []
playlists[playlist.title].append(username)
2 years ago
except requests.exceptions.ConnectionError:
pass
scan_user(self.PlexServer, self.account.title)
for user in self.users:
scan_user(self.PlexServer.switchUser(user), user)
return playlists
def manage_recommendations(self):
return [(r.title, r._data.attrib.get('identifier'), r._data.attrib.get('promotedToRecommended'),
r._data.attrib.get('promotedToOwnHome'), r._data.attrib.get('promotedToSharedHome'))
for r in self.Plex.fetchItems(f"/hubs/sections/{self.Plex.key}/manage")]
def alter_collection(self, item, collection, smart_label_collection=False, add=True):
collection = str(collection)
if smart_label_collection:
self.query_data(item.addLabel if add else item.removeLabel, collection)
else:
locked = True
item = self.reload(item)
if self.agent in ["tv.plex.agents.movie", "tv.plex.agents.series"]:
field = next((f for f in item.fields if f.name == "collection"), None)
locked = field is not None
self.query_collection(item, collection, locked=locked, add=add)
def move_item(self, collection, item, after=None):
key = f"{collection.key}/items/{item}/move"
if after:
key += f"?after={after}"
self._query(key, put=True)
def smart_label_check(self, label):
labels = [la.title for la in self.get_tags("label")] # noqa
if label in labels:
return True
logger.trace(f"Label not found in Plex. Options: {labels}")
return False
def test_smart_filter(self, uri_args):
logger.debug(f"Smart Collection Test: {uri_args}")
test_items = self.get_filter_items(uri_args)
if len(test_items) < 1:
raise Failed(f"Plex Error: No items for smart filter: {uri_args}")
def create_smart_collection(self, title, smart_type, uri_args, ignore_blank_results):
if not ignore_blank_results:
self.test_smart_filter(uri_args)
args = {
"type": smart_type,
"title": title,
"smart": 1,
"sectionId": self.Plex.key,
"uri": self.build_smart_filter(uri_args)
}
self._query(f"/library/collections{utils.joinArgs(args)}", post=True)
def create_blank_collection(self, title):
args = {
"type": 1 if self.is_movie else 2 if self.is_show else 8,
"title": title,
"smart": 0,
"sectionId": self.Plex.key,
"uri": f"{self.PlexServer._uriRoot()}/library/metadata"
}
self._query(f"/library/collections{utils.joinArgs(args)}", post=True)
def get_smart_filter_from_uri(self, uri):
smart_filter = parse.parse_qs(parse.urlparse(uri.replace("/#!/", "/")).query)["key"][0] # noqa
args = smart_filter[smart_filter.index("?"):]
return self.build_smart_filter(args), int(args[args.index("type=") + 5:args.index("type=") + 6])
def build_smart_filter(self, uri_args):
return f"{self.PlexServer._uriRoot()}/library/sections/{self.Plex.key}/all{uri_args}"
def update_smart_collection(self, collection, uri_args):
self.test_smart_filter(uri_args)
self._query(f"/library/collections/{collection.ratingKey}/items{utils.joinArgs({'uri': self.build_smart_filter(uri_args)})}", put=True)
def smart_filter(self, collection):
smart_filter = self.get_collection(collection).content
return smart_filter[smart_filter.index("?"):]
def collection_visibility(self, collection):
try:
attrs = self._query(f"/hubs/sections/{self.Plex.key}/manage?metadataItemId={collection.ratingKey}")[0].attrib
return {
"library": utils.cast(bool, attrs.get("promotedToRecommended", "0")),
"home": utils.cast(bool, attrs.get("promotedToOwnHome", "0")),
"shared": utils.cast(bool, attrs.get("promotedToSharedHome", "0"))
}
except IndexError:
return {"library": False, "home": False, "shared": False}
def collection_visibility_update(self, collection, visibility=None, library=None, home=None, shared=None):
if visibility is None:
visibility = self.collection_visibility(collection)
key = f"/hubs/sections/{self.Plex.key}/manage?metadataItemId={collection.ratingKey}"
key += f"&promotedToRecommended={1 if (library is None and visibility['library']) or library else 0}"
key += f"&promotedToOwnHome={1 if (home is None and visibility['home']) or home else 0}"
key += f"&promotedToSharedHome={1 if (shared is None and visibility['shared']) or shared else 0}"
self._query(key, post=True)
def get_playlist(self, title):
try:
return self.PlexServer.playlist(title)
except NotFound:
raise Failed(f"Plex Error: Playlist {title} not found")
def get_collection(self, data, force_search=False, debug=True):
if isinstance(data, Collection):
3 years ago
return data
elif isinstance(data, int) and not force_search:
return self.fetchItem(data)
else:
3 years ago
cols = self.search(title=str(data), libtype="collection")
for d in cols:
3 years ago
if d.title == data:
return d
if debug:
logger.debug("")
for d in cols:
logger.debug(f"Found: {d.title}")
logger.debug(f"Looking for: {data}")
raise Failed(f"Plex Error: Collection {data} not found")
4 years ago
def validate_collections(self, collections):
valid_collections = []
for collection in collections:
try:
valid_collections.append(self.get_collection(collection))
except Failed as e:
logger.error(e)
4 years ago
if len(valid_collections) == 0:
raise Failed(f"Collection Error: No valid Plex Collections in {collections}")
4 years ago
return valid_collections
def get_watchlist(self, sort=None, is_playlist=False):
if is_playlist:
libtype = None
elif self.is_movie:
libtype = "movie"
else:
libtype = "show"
watchlist = self.account.watchlist(sort=watchlist_sorts[sort], libtype=libtype)
ids = []
for item in watchlist:
tmdb_id = []
tvdb_id = []
imdb_id = []
if self.config.Cache:
cache_id, _, media_type, _ = self.config.Cache.query_guid_map(item.guid)
if cache_id:
ids.extend([(t_id, "tmdb" if "movie" in media_type else "tvdb") for t_id in cache_id])
continue
try:
fin = False
for guid_tag in item.guids:
url_parsed = requests.utils.urlparse(guid_tag.id)
if url_parsed.scheme == "tvdb":
if isinstance(item, Show):
ids.append((int(url_parsed.netloc), "tvdb"))
fin = True
elif url_parsed.scheme == "imdb":
imdb_id.append(url_parsed.netloc)
elif url_parsed.scheme == "tmdb":
if isinstance(item, Movie):
ids.append((int(url_parsed.netloc), "tmdb"))
fin = True
tmdb_id.append(int(url_parsed.netloc))
if fin:
break
if fin:
continue
except requests.exceptions.ConnectionError:
continue
if imdb_id and not tmdb_id:
for imdb in imdb_id:
tmdb, tmdb_type = self.config.Convert.imdb_to_tmdb(imdb)
if tmdb:
tmdb_id.append(tmdb)
if tmdb_id and isinstance(item, Show) and not tvdb_id:
for tmdb in tmdb_id:
tvdb = self.config.Convert.tmdb_to_tvdb(tmdb)
if tvdb:
tvdb_id.append(tvdb)
if isinstance(item, Show) and tvdb_id:
ids.extend([(t_id, "tvdb") for t_id in tvdb_id])
if isinstance(item, Movie) and tmdb_id:
ids.extend([(t_id, "tmdb") for t_id in tmdb_id])
return ids
def get_rating_keys(self, method, data, is_playlist=False):
items = []
if method == "plex_all":
3 years ago
logger.info(f"Processing Plex All {data.capitalize()}s")
items = self.get_all(builder_level=data)
elif method == "plex_watchlist":
logger.info(f"Processing Plex Watchlist")
return self.get_watchlist(sort=data, is_playlist=is_playlist)
elif method == "plex_pilots":
logger.info(f"Processing Plex Pilot {data.capitalize()}s")
items = []
for item in self.get_all():
try:
items.append(item.episode(season=1, episode=1))
except NotFound:
logger.warning(f"Plex Warning: {item.title} has no Season 1 Episode 1 ")
elif method == "plex_search":
logger.info(f"Processing {data[1]}")
logger.trace(data[2])
items = self.get_filter_items(data[2])
elif method == "plex_collectionless":
good_collections = []
3 years ago
logger.info(f"Processing Plex Collectionless")
logger.info("")
for col in self.get_all_collections():
keep_collection = True
for pre in data["exclude_prefix"]:
if col.title.startswith(pre) or (col.titleSort and col.titleSort.startswith(pre)):
keep_collection = False
logger.info(f"Excluded by Prefix Match: {col.title}")
break
if keep_collection:
for ext in data["exclude"]:
if col.title == ext or (col.titleSort and col.titleSort == ext):
keep_collection = False
logger.info(f"Excluded by Exact Match: {col.title}")
break
if keep_collection:
good_collections.append(col)
logger.info("")
logger.info("Collections Not Excluded (Items in these collections are not added to Collectionless)")
for col in good_collections:
logger.info(col.title)
logger.info("")
collection_indexes = [c.title for c in good_collections]
all_items = self.get_all()
for i, item in enumerate(all_items, 1):
logger.ghost(f"Processing: {i}/{len(all_items)} {item.title}")
add_item = True
for collection in item.collections:
if collection.tag in collection_indexes:
add_item = False
break
if add_item:
items.append(item)
logger.info(f"Processed {len(all_items)} {self.type}s")
else:
raise Failed(f"Plex Error: Method {method} not supported")
2 years ago
if not items:
raise Failed("Plex Error: No Items found in Plex")
return [(item.ratingKey, "ratingKey") for item in items]
def get_collection_items(self, collection, smart_label_collection):
if smart_label_collection:
return self.search(label=collection.title if isinstance(collection, Collection) else str(collection))
elif isinstance(collection, (Collection, Playlist)):
if collection.smart:
return self.get_filter_items(self.smart_filter(collection))
else:
return self.query(collection.items)
else:
return []
def get_filter_items(self, uri_args):
key = f"/library/sections/{self.Plex.key}/all{uri_args}"
return self.Plex._search(key, None, 0, plexapi.X_PLEX_CONTAINER_SIZE)
def get_collection_name_and_items(self, collection, smart_label_collection):
name = collection.title if isinstance(collection, (Collection, Playlist)) else str(collection)
return name, self.get_collection_items(collection, smart_label_collection)
def get_tmdb_from_map(self, item):
return self.movie_rating_key_map[item.ratingKey] if item.ratingKey in self.movie_rating_key_map else None
def get_tvdb_from_map(self, item):
return self.show_rating_key_map[item.ratingKey] if item.ratingKey in self.show_rating_key_map else None
def search_item(self, data, year=None, edition=None):
4 years ago
kwargs = {}
if year is not None:
kwargs["year"] = year
if edition is not None:
kwargs["editionTitle"] = edition
return [d for d in self.search(title=str(data), **kwargs) if d.title == data]
def edit_advance(self, item, edits):
try:
self.edit_query(item, edits, advanced=True)
if "languageOverride" in edits or "useOriginalTitle" in edits:
self.query(item.refresh)
return True
except BadRequest:
logger.stacktrace()
return False
def edit_tags(self, attr, obj, add_tags=None, remove_tags=None, sync_tags=None, do_print=True, locked=True, is_locked=None):
display = ""
final = ""
key = attribute_translation[attr] if attr in attribute_translation else attr
actual = "similar" if attr == "similar_artist" else attr
attr_display = attr.replace("_", " ").title()
if add_tags or remove_tags or sync_tags is not None:
_add_tags = add_tags if add_tags else []
_remove_tags = remove_tags if remove_tags else []
_sync_tags = sync_tags if sync_tags else []
try:
obj = self.reload(obj)
_item_tags = [item_tag.tag for item_tag in getattr(obj, key)]
except BadRequest:
_item_tags = []
_add = [t for t in _add_tags + _sync_tags if t not in _item_tags]
_remove = [t for t in _item_tags if (sync_tags is not None and t not in _sync_tags) or t in _remove_tags]
if _add:
self.tag_edit(obj, actual, _add, locked=locked)
display += f"+{', +'.join(_add)}"
if _remove:
self.tag_edit(obj, actual, _remove, locked=locked, remove=True)
2 years ago
if display:
display += ", "
display += f"-{', -'.join(_remove)}"
if is_locked is not None and not display and is_locked != locked:
self.edit_query(obj, {f"{actual}.locked": 1 if locked else 0})
display = "Locked" if locked else "Unlocked"
final = f"{obj.title[:25]:<25} | {attr_display} | {display}" if display else display
if do_print and final:
logger.info(final)
return final[28:] if final else final
def image_update(self, item, image, tmdb=None, title=None, poster=True):
text = f"{f'{title} ' if title else ''}{'Poster' if poster else 'Background'}"
attr = self.mass_poster_update if poster else self.mass_background_update
if attr == "lock":
self.query(item.lockPoster if poster else item.lockArt)
logger.info(f"{text} | Locked")
elif attr == "unlock":
self.query(item.unlockPoster if poster else item.unlockArt)
logger.info(f"{text} | Unlocked")
else:
location = "the Assets Directory" if image else ""
image_url = False if image else True
image = image.location if image else None
if not image:
if attr == "tmdb" and tmdb:
image = tmdb
location = "TMDb"
if not image:
images = item.posters() if poster else item.arts()
temp_image = next((p for p in images), None)
if temp_image:
if temp_image.key.startswith("/"):
image = f"{self.url}{temp_image.key}&X-Plex-Token={self.token}"
else:
image = temp_image.key
location = "Plex"
if image:
logger.info(f"{text} | Reset from {location}")
if poster:
try:
self.upload_poster(item, image, url=image_url)
except BadRequest as e:
logger.stacktrace()
logger.error(f"Plex Error: {e}")
else:
try:
self.upload_background(item, image, url=image_url)
except BadRequest as e:
logger.stacktrace()
logger.error(f"Plex Error: {e}")
if poster and "Overlay" in [la.tag for la in self.item_labels(item)]:
logger.info(self.edit_tags("label", item, remove_tags="Overlay", do_print=False))
else:
logger.warning(f"{text} | No Reset Image Found")
def item_images(self, item, group, alias, initial=False, asset_location=None, asset_directory=None, title=None, image_name=None, folder_name=None, style_data=None):
if title is None:
title = item.title
posters, backgrounds = util.get_image_dicts(group, alias)
if style_data and "url_poster" in style_data and style_data["url_poster"]:
posters["style_data"] = style_data["url_poster"]
elif style_data and "tpdb_poster" in style_data and style_data["tpdb_poster"]:
posters["style_data"] = f"https://theposterdb.com/api/assets/{style_data['tpdb_poster']}"
if style_data and "url_background" in style_data and style_data["url_background"]:
backgrounds["style_data"] = style_data["url_background"]
elif style_data and "tpdb_background" in style_data and style_data["tpdb_background"]:
backgrounds["style_data"] = f"https://theposterdb.com/api/assets/{style_data['tpdb_background']}"
try:
asset_poster, asset_background, item_dir, folder_name = self.find_item_assets(item, item_asset_directory=asset_location, asset_directory=asset_directory)
if asset_poster:
posters["asset_directory"] = asset_poster
if asset_background:
backgrounds["asset_directory"] = asset_background
if asset_location is None or initial:
asset_location = item_dir
except Failed as e:
logger.warning(e)
poster = util.pick_image(title, posters, self.prioritize_assets, self.download_url_assets, asset_location, image_name=image_name)
background = util.pick_image(title, backgrounds, self.prioritize_assets, self.download_url_assets, asset_location,
is_poster=False, image_name=f"{image_name}_background" if image_name else image_name)
updated = False
if poster or background:
pu, bu = self.upload_images(item, poster=poster, background=background, overlay=True)
if pu or bu:
updated = True
return asset_location, folder_name, updated
def find_and_upload_assets(self, item, current_labels, asset_directory=None):
item_dir = None
name = None
2 years ago
try:
poster, background, item_dir, name = self.find_item_assets(item, asset_directory=asset_directory)
2 years ago
if "Overlay" not in current_labels:
if poster or background:
self.upload_images(item, poster=poster, background=background)
elif self.show_missing_assets:
logger.warning(f"Asset Warning: No poster or background found in the assets folder '{item_dir}'")
else:
logger.warning(f"Asset Warning: {name} has an Overlay and will be updated when overlays are run")
except Failed as e:
if self.show_missing_assets:
logger.warning(e)
if isinstance(item, Show) and ((self.asset_folders and item_dir) or not self.asset_folders):
missing_seasons = ""
missing_episodes = ""
found_season = False
found_episode = False
for season in self.query(item.seasons):
try:
season_poster, season_background, _, _ = self.find_item_assets(season, item_asset_directory=item_dir, asset_directory=asset_directory, folder_name=name)
2 years ago
if season_poster:
found_season = True
elif self.show_missing_season_assets and season.seasonNumber > 0:
missing_seasons += f"\nMissing Season {season.seasonNumber} Poster"
2 years ago
if season_poster or season_background and "Overlay" not in [la.tag for la in self.item_labels(season)]:
2 years ago
self.upload_images(season, poster=season_poster, background=season_background)
except Failed as e:
if self.show_missing_assets:
logger.warning(e)
for episode in self.query(season.episodes):
try:
2 years ago
if episode.seasonEpisode:
episode_poster, episode_background, _, _ = self.find_item_assets(episode, item_asset_directory=item_dir, asset_directory=asset_directory, folder_name=name)
2 years ago
if episode_poster or episode_background:
found_episode = True
2 years ago
if "Overlay" not in [la.tag for la in self.item_labels(episode)]:
self.upload_images(episode, poster=episode_poster, background=episode_background)
2 years ago
elif self.show_missing_episode_assets:
missing_episodes += f"\nMissing {episode.seasonEpisode.upper()} Title Card"
except Failed as e:
if self.show_missing_assets:
logger.warning(e)
if (found_season and missing_seasons) or (found_episode and missing_episodes):
logger.info(f"Missing Posters for {item.title}{missing_seasons}{missing_episodes}")
if isinstance(item, Artist):
missing_assets = ""
found_album = False
for album in self.query(item.albums):
try:
album_poster, album_background, _, _ = self.find_item_assets(album, item_asset_directory=item_dir, asset_directory=asset_directory, folder_name=name)
2 years ago
if album_poster or album_background:
found_album = True
elif self.show_missing_season_assets:
missing_assets += f"\nMissing Album {album.title} Poster"
if album_poster or album_background:
self.upload_images(album, poster=album_poster, background=album_background)
except Failed as e:
if self.show_missing_assets:
logger.warning(e)
if self.show_missing_season_assets and found_album and missing_assets:
logger.info(f"Missing Album Posters for {item.title}{missing_assets}")
2 years ago
def find_item_assets(self, item, item_asset_directory=None, asset_directory=None, folder_name=None):
3 years ago
poster = None
background = None
if asset_directory is None:
asset_directory = self.asset_directory
is_top_level = isinstance(item, (Movie, Artist, Show, Collection, Playlist, str))
3 years ago
if isinstance(item, Album):
prefix = f"{item.parentTitle} Album {item.title}'s "
3 years ago
file_name = item.title
elif isinstance(item, Season):
prefix = f"{item.parentTitle} Season {item.seasonNumber}'s "
2 years ago
file_name = f"Season{'0' if not item.seasonNumber or item.seasonNumber < 10 else ''}{item.seasonNumber}"
3 years ago
elif isinstance(item, Episode):
prefix = f"{item.grandparentTitle} {item.seasonEpisode.upper()}'s "
3 years ago
file_name = item.seasonEpisode.upper()
else:
prefix = f"{item if isinstance(item, str) else item.title}'s "
3 years ago
file_name = "poster"
if not item_asset_directory:
if isinstance(item, (Movie, Artist, Album, Show, Episode, Season)):
if isinstance(item, (Episode, Season)):
starting = item.show()
elif isinstance(item, (Album, Track)):
starting = item.artist()
else:
starting = item
if not starting.locations:
raise Failed(f"Asset Warning: No video filepath found fo {item.title}")
3 years ago
path_test = str(starting.locations[0])
if not os.path.dirname(path_test):
path_test = path_test.replace("\\", "/")
folder_name = os.path.basename(os.path.dirname(path_test) if isinstance(starting, Movie) else path_test)
elif isinstance(item, (Collection, Playlist)):
folder_name = item.title
3 years ago
else:
folder_name = item
folder_name, _ = util.validate_filename(folder_name)
3 years ago
if not self.asset_folders:
file_name = folder_name if file_name == "poster" else f"{folder_name}_{file_name}"
3 years ago
if not item_asset_directory:
3 years ago
for ad in asset_directory:
if self.asset_folders:
if os.path.isdir(os.path.join(ad, folder_name)):
item_asset_directory = os.path.join(ad, folder_name)
else:
for n in range(1, self.asset_depth + 1):
new_path = ad
for i in range(1, n + 1):
new_path = os.path.join(new_path, "*")
matches = util.glob_filter(os.path.join(new_path, folder_name))
if len(matches) > 0:
item_asset_directory = os.path.abspath(matches[0])
break
3 years ago
else:
matches = util.glob_filter(os.path.join(ad, f"{file_name}.*"))
if len(matches) > 0:
item_asset_directory = ad
if item_asset_directory:
break
if not item_asset_directory:
if self.asset_folders:
if self.create_asset_folders and asset_directory:
item_asset_directory = os.path.join(asset_directory[0], folder_name)
os.makedirs(item_asset_directory, exist_ok=True)
logger.warning(f"Asset Warning: Asset Directory Not Found and Created: {item_asset_directory}")
else:
raise Failed(f"Asset Warning: Unable to find asset folder: '{folder_name}'")
return None, None, item_asset_directory, folder_name
3 years ago
poster_filter = os.path.join(item_asset_directory, f"{file_name}.*")
background_filter = os.path.join(item_asset_directory, "background.*" if file_name == "poster" else f"{file_name}_background.*")
poster_matches = util.glob_filter(poster_filter)
if len(poster_matches) > 0:
poster = ImageData("asset_directory", os.path.abspath(poster_matches[0]), prefix=prefix, is_url=False)
background_matches = util.glob_filter(background_filter)
if len(background_matches) > 0:
background = ImageData("asset_directory", os.path.abspath(background_matches[0]), prefix=prefix, is_poster=False, is_url=False)
if is_top_level and self.asset_folders and self.dimensional_asset_rename and (not poster or not background):
for file in util.glob_filter(os.path.join(item_asset_directory, "*.*")):
if file.lower().endswith((".png", ".jpg", ".jpeg", "webp")) and not re.match(r"s\d+e\d+|season\d+", file.lower()):
3 years ago
try:
with Image.open(file) as image:
_w, _h = image.size
3 years ago
if not poster and _h >= _w:
new_path = os.path.join(os.path.dirname(file), f"poster{os.path.splitext(file)[1].lower()}")
os.rename(file, new_path)
poster = ImageData("asset_directory", os.path.abspath(new_path), prefix=prefix, is_url=False)
3 years ago
elif not background and _w > _h:
new_path = os.path.join(os.path.dirname(file), f"background{os.path.splitext(file)[1].lower()}")
os.rename(file, new_path)
background = ImageData("asset_directory", os.path.abspath(new_path), prefix=prefix, is_poster=False, is_url=False)
3 years ago
if poster and background:
break
except OSError:
logger.error(f"Asset Error: Failed to open image: {file}")
return poster, background, item_asset_directory, folder_name
def get_ids(self, item):
tmdb_id = None
tvdb_id = None
imdb_id = None
if self.config.Cache:
t_id, i_id, guid_media_type, _ = self.config.Cache.query_guid_map(item.guid)
if t_id:
if "movie" in guid_media_type:
tmdb_id = t_id[0]
else:
tvdb_id = t_id[0]
if i_id:
imdb_id = i_id[0]
if not tmdb_id and not tvdb_id:
tmdb_id = self.get_tmdb_from_map(item)
if not tmdb_id and not tvdb_id and self.is_show:
tvdb_id = self.get_tvdb_from_map(item)
return tmdb_id, tvdb_id, imdb_id
def get_locked_attributes(self, item, titles=None):
2 years ago
item = self.reload(item)
attrs = {}
fields = {f.name: f for f in item.fields if f.locked}
if isinstance(item, (Movie, Show)) and titles and titles.count(item.title) > 1:
map_key = f"{item.title} ({item.year})"
attrs["title"] = item.title
attrs["year"] = item.year
elif isinstance(item, (Season, Episode, Track)) and item.index:
map_key = int(item.index)
else:
map_key = item.title
if "title" in fields:
if isinstance(item, (Movie, Show)):
tmdb_id, tvdb_id, imdb_id = self.get_ids(item)
tmdb_item = self.config.TMDb.get_item(item, tmdb_id, tvdb_id, imdb_id, is_movie=isinstance(item, Movie))
if tmdb_item:
attrs["alt_title"] = tmdb_item.title
elif isinstance(item, (Season, Episode, Track)):
attrs["title"] = item.title
def check_field(plex_key, pmm_key, var_key=None):
if plex_key in fields and pmm_key not in self.metadata_backup["exclude"]:
if not var_key:
var_key = plex_key
if hasattr(item, var_key):
plex_value = getattr(item, var_key)
if isinstance(plex_value, list):
plex_tags = [t.tag for t in plex_value]
if len(plex_tags) > 0 or self.metadata_backup["sync_tags"]:
attrs[f"{pmm_key}.sync" if self.metadata_backup["sync_tags"] else pmm_key] = None if not plex_tags else plex_tags[0] if len(plex_tags) == 1 else plex_tags
elif isinstance(plex_value, datetime):
attrs[pmm_key] = datetime.strftime(plex_value, "%Y-%m-%d")
else:
attrs[pmm_key] = plex_value
check_field("titleSort", "sort_title")
check_field("originalTitle", "original_artist" if self.is_music else "original_title")
check_field("originallyAvailableAt", "originally_available")
check_field("contentRating", "content_rating")
check_field("userRating", "user_rating")
check_field("audienceRating", "audience_rating")
check_field("rating", "critic_rating")
check_field("studio", "record_label" if self.is_music else "studio")
check_field("tagline", "tagline")
check_field("summary", "summary")
check_field("index", "track")
check_field("parentIndex", "disc")
check_field("director", "director", var_key="directors")
check_field("country", "country", var_key="countries")
check_field("genre", "genre", var_key="genres")
check_field("writer", "writer", var_key="writers")
check_field("producer", "producer", var_key="producers")
check_field("collection", "collection", var_key="collections")
check_field("label", "label", var_key="labels")
check_field("mood", "mood", var_key="moods")
check_field("style", "style", var_key="styles")
check_field("similar", "similar_artist")
if self.type in util.advance_tags_to_edit:
for advance_edit in util.advance_tags_to_edit[self.type]:
key, options = item_advance_keys[f"item_{advance_edit}"]
if advance_edit in self.metadata_backup["exclude"] or not hasattr(item, key):
continue
keys = {v: k for k, v in options.items()}
if keys[getattr(item, key)] not in ["default", "all", "never"]:
attrs[advance_edit] = keys[getattr(item, key)]
def _recur(sub):
sub_items = {}
for sub_item in getattr(item, sub)():
sub_item_key, sub_item_attrs = self.get_locked_attributes(sub_item)
if sub_item_attrs:
sub_items[sub_item_key] = sub_item_attrs
if sub_items:
attrs[sub] = sub_items
if isinstance(item, Show):
_recur("seasons")
elif isinstance(item, Season):
_recur("episodes")
elif isinstance(item, Artist):
_recur("albums")
elif isinstance(item, Album):
_recur("tracks")
return map_key, attrs
def get_item_sort_title(self, item_to_sort, atr="titleSort"):
if isinstance(item_to_sort, Album):
return f"{getattr(item_to_sort.artist(), atr)} Album {getattr(item_to_sort, atr)}"
elif isinstance(item_to_sort, Season):
return f"{getattr(item_to_sort.show(), atr)} Season {item_to_sort.seasonNumber}"
elif isinstance(item_to_sort, Episode):
return f"{getattr(item_to_sort.show(), atr)} {item_to_sort.seasonEpisode.upper()}"
else:
return getattr(item_to_sort, atr)
def split(self, text):
attribute, modifier = os.path.splitext(str(text).lower())
attribute = method_alias[attribute] if attribute in method_alias else attribute
modifier = modifier_alias[modifier] if modifier in modifier_alias else modifier
if attribute == "add_to_arr":
attribute = "radarr_add_missing" if self.is_movie else "sonarr_add_missing"
elif attribute in ["arr_tag", "arr_folder"]:
attribute = f"{'rad' if self.is_movie else 'son'}{attribute}"
elif attribute in builder.date_attributes and modifier in [".gt", ".gte"]:
modifier = ".after"
elif attribute in builder.date_attributes and modifier in [".lt", ".lte"]:
modifier = ".before"
final = f"{attribute}{modifier}"
if text != final:
logger.warning(f"Collection Warning: {text} attribute will run as {final}")
return attribute, modifier, final
def check_filters(self, item, filters_in, current_time):
for filter_method, filter_data in filters_in:
filter_attr, modifier, filter_final = self.split(filter_method)
if self.check_filter(item, filter_attr, modifier, filter_final, filter_data, current_time) is False:
return False
return True
def check_filter(self, item, filter_attr, modifier, filter_final, filter_data, current_time):
filter_actual = attribute_translation[filter_attr] if filter_attr in attribute_translation else filter_attr
if isinstance(item, Movie):
item_type = "movie"
elif isinstance(item, Show):
item_type = "show"
elif isinstance(item, Season):
item_type = "season"
elif isinstance(item, Episode):
item_type = "episode"
elif isinstance(item, Artist):
item_type = "artist"
elif isinstance(item, Album):
item_type = "album"
elif isinstance(item, Track):
item_type = "track"
else:
return True
3 years ago
item = self.reload(item)
if filter_attr not in builder.filters[item_type]:
return True
elif filter_attr in builder.date_filters:
if util.is_date_filter(getattr(item, filter_actual), modifier, filter_data, filter_final, current_time):
return False
elif filter_attr in builder.string_filters:
values = []
if filter_attr == "audio_track_title":
for media in item.media:
for part in media.parts:
values.extend([a.extendedDisplayTitle for a in part.audioStreams() if a.extendedDisplayTitle])
elif filter_attr in ["audio_codec", "audio_profile", "video_codec", "video_profile"]:
for media in item.media:
attr = getattr(media, filter_actual)
if attr and attr not in values:
values.append(attr)
elif filter_attr in ["filepath", "folder"]:
values = [loc for loc in item.locations if loc]
else:
test_value = getattr(item, filter_actual)
values = [test_value] if test_value else []
if util.is_string_filter(values, modifier, filter_data):
return False
elif filter_attr in builder.boolean_filters:
filter_check = False
if filter_attr == "has_collection":
filter_check = len(item.collections) > 0
elif filter_attr == "has_edition":
filter_check = True if item.editionTitle else False
elif filter_attr == "has_stinger":
filter_check = False
if item.ratingKey in self.movie_rating_key_map and self.movie_rating_key_map[item.ratingKey] in self.config.mediastingers:
filter_check = True
elif filter_attr == "has_overlay":
for label in self.item_labels(item):
if label.tag.lower().endswith(" overlay") or label.tag.lower() == "overlay":
filter_check = True
break
elif filter_attr == "has_dolby_vision":
for media in item.media:
for part in media.parts:
for stream in part.videoStreams():
if stream.DOVIPresent:
filter_check = True
break
if util.is_boolean_filter(filter_data, filter_check):
return False
elif filter_attr == "history":
item_date = item.originallyAvailableAt
if item_date is None:
return False
elif filter_data == "day":
if item_date.month != current_time.month or item_date.day != current_time.day:
return False
elif filter_data == "month":
if item_date.month != current_time.month:
return False
else:
date_match = False
for i in range(filter_data):
check_date = current_time - timedelta(days=i)
if item_date.month == check_date.month and item_date.day == check_date.day:
date_match = True
if date_match is False:
return False
elif filter_attr in ["seasons", "episodes", "albums", "tracks"]:
if filter_attr == "seasons":
sub_items = item.seasons()
elif filter_attr == "albums":
sub_items = item.albums()
elif filter_attr == "tracks":
sub_items = item.tracks()
else:
sub_items = item.episodes()
filters_in = []
percentage = 60
for sub_atr, sub_data in filter_data.items():
if sub_atr == "percentage":
percentage = sub_data
else:
filters_in.append((sub_atr, sub_data))
failure_threshold = len(sub_items) * ((100 - percentage) / 100)
failures = 0
for sub_item in sub_items:
if self.check_filters(sub_item, filters_in, current_time) is False:
failures += 1
if failures > failure_threshold:
return False
elif (filter_attr != "year" and filter_attr in builder.number_filters) or modifier in [".gt", ".gte", ".lt", ".lte", ".count_gt", ".count_gte", ".count_lt", ".count_lte"]:
test_number = []
if filter_attr in ["channels", "height", "width", "aspect"]:
test_number = 0
for media in item.media:
attr = getattr(media, filter_actual)
if attr and attr > test_number:
test_number = attr
elif filter_attr == "stinger_rating":
test_number = None
if item.ratingKey in self.movie_rating_key_map and self.movie_rating_key_map[item.ratingKey] in self.config.mediastingers:
test_number = self.config.mediastingers[self.movie_rating_key_map[item.ratingKey]]
elif filter_attr == "versions":
test_number = len(item.media)
elif filter_attr == "audio_language":
for media in item.media:
for part in media.parts:
test_number.extend([a.language for a in part.audioStreams()])
elif filter_attr == "subtitle_language":
for media in item.media:
for part in media.parts:
test_number.extend([s.language for s in part.subtitleStreams()])
elif filter_attr == "duration":
2 years ago
test_number = getattr(item, filter_actual)
if test_number:
test_number /= 60000
else:
test_number = getattr(item, filter_actual)
if modifier in [".count_gt", ".count_gte", ".count_lt", ".count_lte"]:
test_number = len(test_number) if test_number else 0
modifier = f".{modifier[7:]}"
if test_number is None or util.is_number_filter(test_number, modifier, filter_data):
return False
else:
attrs = []
if filter_attr in ["resolution", "audio_language", "subtitle_language"]:
for media in item.media:
if filter_attr == "resolution":
attrs.append(media.videoResolution)
for part in media.parts:
if filter_attr == "audio_language":
2 years ago
for a in part.audioStreams():
attrs.extend([a.language, a.languageCode])
if filter_attr == "subtitle_language":
2 years ago
for s in part.subtitleStreams():
attrs.extend([s.language, s.languageCode])
elif filter_attr in ["content_rating", "year", "rating"]:
attrs = [getattr(item, filter_actual)]
elif filter_attr in ["actor", "country", "director", "genre", "label", "producer", "writer",
"collection", "network"]:
attrs = [attr.tag for attr in getattr(item, filter_actual)]
else:
raise Failed(f"Filter Error: filter: {filter_final} not supported")
if modifier == ".regex":
has_match = False
for reg in filter_data:
for name in attrs:
if re.compile(reg).search(name):
has_match = True
if has_match is False:
return False
elif (not list(set(filter_data) & set(attrs)) and modifier == "") \
or (list(set(filter_data) & set(attrs)) and modifier == ".not"):
return False
return True