You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/builder.py

2388 lines
153 KiB

import logging, os, re
from datetime import datetime, timedelta
from modules import anidb, anilist, flixpatrol, icheckmovies, imdb, letterboxd, mal, plex, radarr, sonarr, stevenlu, tautulli, tmdb, trakt, tvdb, util
from modules.util import Failed, ImageData, NotScheduled
from PIL import Image
from plexapi.exceptions import BadRequest, NotFound
from plexapi.video import Movie, Show, Season, Episode
from urllib.parse import quote
logger = logging.getLogger("Plex Meta Manager")
string_filters = ["title", "episode_title", "studio"]
advance_new_agent = ["item_metadata_language", "item_use_original_title"]
advance_show = ["item_episode_sorting", "item_keep_episodes", "item_delete_episodes", "item_season_display", "item_episode_sorting"]
method_alias = {
"actors": "actor", "role": "actor", "roles": "actor",
"show_actor": "actor", "show_actors": "actor", "show_role": "actor", "show_roles": "actor",
"collections": "collection", "plex_collection": "collection",
"show_collections": "collection", "show_collection": "collection",
"content_ratings": "content_rating", "contentRating": "content_rating", "contentRatings": "content_rating",
"countries": "country",
"decades": "decade",
"directors": "director",
"genres": "genre",
"labels": "label",
"rating": "critic_rating",
"show_user_rating": "user_rating",
3 years ago
"video_resolution": "resolution",
"tmdb_trending": "tmdb_trending_daily",
"play": "plays", "show_plays": "plays", "show_play": "plays", "episode_play": "episode_plays",
"originally_available": "release", "episode_originally_available": "episode_air_date",
"episode_release": "episode_air_date", "episode_released": "episode_air_date",
"show_originally_available": "release", "show_release": "release", "show_air_date": "release",
3 years ago
"released": "release", "show_released": "release", "max_age": "release",
"studios": "studio",
"networks": "network",
"producers": "producer",
"writers": "writer",
"years": "year", "show_year": "year", "show_years": "year",
"show_title": "title", "filter": "filters",
"seasonyear": "year", "isadult": "adult", "startdate": "start", "enddate": "end", "averagescore": "score",
"minimum_tag_percentage": "min_tag_percent", "minimumtagrank": "min_tag_percent", "minimum_tag_rank": "min_tag_percent",
"anilist_tag": "anilist_search", "anilist_genre": "anilist_search", "anilist_season": "anilist_search",
"mal_producer": "mal_studio", "mal_licensor": "mal_studio",
"trakt_recommended": "trakt_recommended_weekly", "trakt_watched": "trakt_watched_weekly", "trakt_collected": "trakt_collected_weekly"
}
3 years ago
filter_translation = {
"actor": "actors",
"audience_rating": "audienceRating",
"collection": "collections",
"content_rating": "contentRating",
"country": "countries",
"critic_rating": "rating",
"director": "directors",
"genre": "genres",
"label": "labels",
"producer": "producers",
"release": "originallyAvailableAt",
3 years ago
"added": "addedAt",
"last_played": "lastViewedAt",
"plays": "viewCount",
"user_rating": "userRating",
"writer": "writers"
}
modifier_alias = {".greater": ".gt", ".less": ".lt"}
all_builders = anidb.builders + anilist.builders + flixpatrol.builders + icheckmovies.builders + imdb.builders + \
letterboxd.builders + mal.builders + plex.builders + stevenlu.builders + tautulli.builders + \
tmdb.builders + trakt.builders + tvdb.builders
show_only_builders = ["tmdb_network", "tmdb_show", "tmdb_show_details", "tvdb_show", "tvdb_show_details", "collection_level", "item_tmdb_season_titles"]
movie_only_builders = [
"letterboxd_list", "letterboxd_list_details", "icheckmovies_list", "icheckmovies_list_details", "stevenlu_popular",
3 years ago
"tmdb_collection", "tmdb_collection_details", "tmdb_movie", "tmdb_movie_details", "tmdb_now_playing",
"tvdb_movie", "tvdb_movie_details", "trakt_boxoffice"
]
summary_details = [
"summary", "tmdb_summary", "tmdb_description", "tmdb_biography", "tvdb_summary",
"tvdb_description", "trakt_description", "letterboxd_description", "icheckmovies_description"
]
3 years ago
poster_details = ["url_poster", "tmdb_poster", "tmdb_profile", "tvdb_poster", "file_poster"]
background_details = ["url_background", "tmdb_background", "tvdb_background", "file_background"]
boolean_details = [
3 years ago
"visible_library", "visible_home", "visible_shared", "show_filtered", "show_missing", "save_missing",
"missing_only_released", "only_filter_missing", "delete_below_minimum"
]
3 years ago
string_details = ["sort_title", "content_rating", "name_mapping"]
ignored_details = [
3 years ago
"smart_filter", "smart_label", "smart_url", "run_again", "schedule", "sync_mode", "template", "test", "delete_not_scheduled",
"tmdb_person", "build_collection", "collection_order", "collection_level", "validate_builders", "collection_name", "sort_by"
]
details = ["ignore_ids", "ignore_imdb_ids", "server_preroll", "collection_changes_webhooks", "collection_mode",
"collection_minimum", "label"] + boolean_details + string_details
3 years ago
collectionless_details = ["collection_order", "plex_collectionless", "label", "label_sync_mode", "test"] + \
poster_details + background_details + summary_details + string_details
item_bool_details = ["item_tmdb_season_titles", "item_assets", "revert_overlay", "item_lock_background", "item_lock_poster", "item_lock_title", "item_refresh"]
item_details = ["item_label", "item_radarr_tag", "item_sonarr_tag", "item_overlay"] + item_bool_details + list(plex.item_advance_keys.keys())
none_details = ["label.sync", "item_label.sync"]
radarr_details = ["radarr_add", "radarr_add_existing", "radarr_folder", "radarr_monitor", "radarr_search", "radarr_availability", "radarr_quality", "radarr_tag"]
sonarr_details = [
"sonarr_add", "sonarr_add_existing", "sonarr_folder", "sonarr_monitor", "sonarr_language", "sonarr_series",
"sonarr_quality", "sonarr_season", "sonarr_search", "sonarr_cutoff_search", "sonarr_tag"
]
parts_collection_valid = [
"plex_search", "trakt_list", "trakt_list_details", "collection_mode", "label", "visible_library", "collection_changes_webhooks"
"visible_home", "visible_shared", "show_missing", "save_missing", "missing_only_released", "server_preroll",
"item_lock_background", "item_lock_poster", "item_lock_title", "item_refresh"
] + summary_details + poster_details + background_details + string_details
all_filters = [
"actor", "actor.not",
"audio_language", "audio_language.not",
"audio_track_title", "audio_track_title.not", "audio_track_title.is", "audio_track_title.isnot", "audio_track_title.begins", "audio_track_title.ends", "audio_track_title.regex",
"collection", "collection.not",
"content_rating", "content_rating.not",
"country", "country.not",
"director", "director.not",
"filepath", "filepath.not", "filepath.is", "filepath.isnot", "filepath.begins", "filepath.ends", "filepath.regex",
"genre", "genre.not",
3 years ago
"label", "label.not",
"producer", "producer.not",
"release", "release.not", "release.before", "release.after", "release.regex", "history",
"added", "added.not", "added.before", "added.after", "added.regex",
"last_played", "last_played.not", "last_played.before", "last_played.after", "last_played.regex",
3 years ago
"first_episode_aired", "first_episode_aired.not", "first_episode_aired.before", "first_episode_aired.after", "first_episode_aired.regex",
"last_episode_aired", "last_episode_aired.not", "last_episode_aired.before", "last_episode_aired.after", "last_episode_aired.regex",
"title", "title.not", "title.is", "title.isnot", "title.begins", "title.ends", "title.regex",
3 years ago
"plays.gt", "plays.gte", "plays.lt", "plays.lte",
"tmdb_vote_count.gt", "tmdb_vote_count.gte", "tmdb_vote_count.lt", "tmdb_vote_count.lte",
"duration.gt", "duration.gte", "duration.lt", "duration.lte",
"original_language", "original_language.not",
3 years ago
"user_rating.gt", "user_rating.gte", "user_rating.lt", "user_rating.lte",
"audience_rating.gt", "audience_rating.gte", "audience_rating.lt", "audience_rating.lte",
"critic_rating.gt", "critic_rating.gte", "critic_rating.lt", "critic_rating.lte",
"studio", "studio.not", "studio.is", "studio.isnot", "studio.begins", "studio.ends", "studio.regex",
"subtitle_language", "subtitle_language.not",
3 years ago
"resolution", "resolution.not",
"writer", "writer.not", "has_collection",
3 years ago
"year", "year.gt", "year.gte", "year.lt", "year.lte", "year.not"
3 years ago
"tmdb_year", "tmdb_year.gt", "tmdb_year.gte", "tmdb_year.lt", "tmdb_year.lte", "tmdb_year.not"
]
3 years ago
tmdb_filters = ["original_language", "tmdb_vote_count", "tmdb_year", "first_episode_aired", "last_episode_aired"]
movie_only_filters = [
"audio_language", "audio_language.not",
"audio_track_title", "audio_track_title.not", "audio_track_title.is", "audio_track_title.isnot", "audio_track_title.begins", "audio_track_title.ends", "audio_track_title.regex",
"country", "country.not",
"director", "director.not",
3 years ago
"duration.gt", "duration.gte", "duration.lt", "duration.lte",
"original_language", "original_language.not",
"subtitle_language", "subtitle_language.not",
3 years ago
"resolution", "resolution.not",
"writer", "writer.not"
]
3 years ago
show_only_filters = ["first_episode_aired", "last_episode_aired", "network"]
smart_invalid = ["collection_order", "collection_level"]
3 years ago
smart_url_invalid = ["filters", "run_again", "sync_mode", "show_filtered", "show_missing", "save_missing", "smart_label"] + radarr_details + sonarr_details
custom_sort_builders = [
"plex_search", "tmdb_list", "tmdb_popular", "tmdb_now_playing", "tmdb_top_rated",
"tmdb_trending_daily", "tmdb_trending_weekly", "tmdb_discover",
"tvdb_list", "imdb_chart", "imdb_list", "stevenlu_popular", "anidb_popular",
"trakt_list", "trakt_trending", "trakt_popular", "trakt_boxoffice",
"trakt_collected_daily", "trakt_collected_weekly", "trakt_collected_monthly", "trakt_collected_yearly", "trakt_collected_all",
"flixpatrol_url", "flixpatrol_demographics", "flixpatrol_popular", "flixpatrol_top",
"trakt_recommended_daily", "trakt_recommended_weekly", "trakt_recommended_monthly", "trakt_recommended_yearly", "trakt_recommended_all",
"trakt_watched_daily", "trakt_watched_weekly", "trakt_watched_monthly", "trakt_watched_yearly", "trakt_watched_all",
"tautulli_popular", "tautulli_watched", "letterboxd_list", "icheckmovies_list",
"anilist_top_rated", "anilist_popular", "anilist_season", "anilist_studio", "anilist_genre", "anilist_tag", "anilist_search",
"mal_all", "mal_airing", "mal_upcoming", "mal_tv", "mal_movie", "mal_ova", "mal_special",
"mal_popular", "mal_favorite", "mal_suggested", "mal_userlist", "mal_season", "mal_genre", "mal_studio"
]
class CollectionBuilder:
3 years ago
def __init__(self, config, library, metadata, name, no_missing, data, playlist=False):
self.config = config
self.library = library
self.metadata = metadata
self.mapping_name = name
self.no_missing = no_missing
self.data = data
3 years ago
self.playlist = playlist
3 years ago
self.language = self.library.Plex.language
self.details = {
"show_filtered": self.library.show_filtered,
"show_missing": self.library.show_missing,
"save_missing": self.library.save_missing,
"missing_only_released": self.library.missing_only_released,
"only_filter_missing": self.library.only_filter_missing,
"create_asset_folders": self.library.create_asset_folders,
"delete_below_minimum": self.library.delete_below_minimum,
"delete_not_scheduled": self.library.delete_not_scheduled,
"collection_changes_webhooks": self.library.collection_changes_webhooks
}
self.item_details = {}
self.radarr_details = {}
self.sonarr_details = {}
self.missing_movies = []
self.missing_shows = []
self.missing_parts = []
self.builders = []
self.filters = []
self.tmdb_filters = []
self.rating_keys = []
self.filtered_keys = {}
self.run_again_movies = []
self.run_again_shows = []
3 years ago
self.notification_additions = []
self.notification_removals = []
self.items = []
self.posters = {}
self.backgrounds = {}
self.summaries = {}
4 years ago
self.schedule = ""
self.minimum = self.library.collection_minimum
self.ignore_ids = [i for i in self.library.ignore_ids]
self.ignore_imdb_ids = [i for i in self.library.ignore_imdb_ids]
self.server_preroll = None
self.current_time = datetime.now()
self.current_year = self.current_time.year
self.collection_poster = None
self.collection_background = None
self.exists = False
self.created = False
self.deleted = False
3 years ago
self.type = "playlist" if self.playlist else "collection"
self.Type = self.type.capitalize()
methods = {m.lower(): m for m in self.data}
3 years ago
if f"{self.type}_name" in methods:
logger.debug("")
3 years ago
logger.debug(f"Validating Method: {self.type}_name")
if not self.data[methods[f"{self.type}_name"]]:
raise Failed(f"{self.Type} Error: {self.type}_name attribute is blank")
logger.debug(f"Value: {self.data[methods['{}_name'.format(self.type)]]}")
self.name = self.data[methods[f"{self.type}_name"]]
else:
self.name = self.mapping_name
if "template" in methods:
logger.debug("")
logger.debug("Validating Method: template")
if not self.metadata.templates:
3 years ago
raise Failed(f"{self.Type} Error: No templates found")
elif not self.data[methods["template"]]:
3 years ago
raise Failed(f"{self.Type} Error: template attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['template']]}")
4 years ago
for variables in util.get_list(self.data[methods["template"]], split=False):
if not isinstance(variables, dict):
3 years ago
raise Failed(f"{self.Type} Error: template attribute is not a dictionary")
4 years ago
elif "name" not in variables:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute name is required")
4 years ago
elif not variables["name"]:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute name is blank")
4 years ago
elif variables["name"] not in self.metadata.templates:
3 years ago
raise Failed(f"{self.Type} Error: template {variables['name']} not found")
4 years ago
elif not isinstance(self.metadata.templates[variables["name"]], dict):
3 years ago
raise Failed(f"{self.Type} Error: template {variables['name']} is not a dictionary")
else:
4 years ago
for tm in variables:
if not variables[tm]:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute {tm} is blank")
4 years ago
if "collection_name" not in variables:
variables["collection_name"] = str(self.name)
4 years ago
template_name = variables["name"]
template = self.metadata.templates[template_name]
4 years ago
default = {}
if "default" in template:
if template["default"]:
if isinstance(template["default"], dict):
for dv in template["default"]:
4 years ago
if template["default"][dv]:
default[dv] = template["default"][dv]
else:
3 years ago
raise Failed(f"{self.Type} Error: template default sub-attribute {dv} is blank")
else:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute default is not a dictionary")
else:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute default is blank")
4 years ago
optional = []
if "optional" in template:
if template["optional"]:
for op in util.get_list(template["optional"]):
if op not in default:
optional.append(str(op))
else:
logger.warning(f"Template Warning: variable {op} cannot be optional if it has a default")
4 years ago
else:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute optional is blank")
if "move_collection_prefix" in template:
if template["move_collection_prefix"]:
for op in util.get_list(template["move_collection_prefix"]):
variables["collection_name"] = variables["collection_name"].replace(f"{str(op).strip()} ", "") + f", {str(op).strip()}"
else:
3 years ago
raise Failed(f"{self.Type} Error: template sub-attribute move_collection_prefix is blank")
4 years ago
def check_data(_data):
if isinstance(_data, dict):
final_data = {}
for sm, sd in _data.items():
4 years ago
try:
4 years ago
final_data[sm] = check_data(sd)
4 years ago
except Failed:
continue
4 years ago
elif isinstance(_data, list):
final_data = []
for li in _data:
try:
final_data.append(check_data(li))
except Failed:
continue
else:
txt = str(_data)
4 years ago
def scan_text(og_txt, var, var_value):
4 years ago
if og_txt == f"<<{var}>>":
4 years ago
return str(var_value)
4 years ago
elif f"<<{var}>>" in str(og_txt):
return str(og_txt).replace(f"<<{var}>>", str(var_value))
else:
return og_txt
for option in optional:
if option not in variables and f"<<{option}>>" in txt:
raise Failed
for variable, variable_data in variables.items():
if variable != "name":
4 years ago
txt = scan_text(txt, variable, variable_data)
4 years ago
for dm, dd in default.items():
4 years ago
txt = scan_text(txt, dm, dd)
if txt in ["true", "True"]:
final_data = True
elif txt in ["false", "False"]:
final_data = False
else:
4 years ago
try:
num_data = float(txt)
final_data = int(num_data) if num_data.is_integer() else num_data
except (ValueError, TypeError):
final_data = txt
4 years ago
return final_data
for method_name, attr_data in template.items():
if method_name not in self.data and method_name not in ["default", "optional", "move_collection_prefix"]:
4 years ago
if attr_data is None:
logger.error(f"Template Error: template attribute {method_name} is blank")
continue
try:
self.data[method_name] = check_data(attr_data)
methods[method_name.lower()] = method_name
except Failed:
continue
if "delete_not_scheduled" in methods:
logger.debug("")
logger.debug("Validating Method: delete_not_scheduled")
logger.debug(f"Value: {data[methods['delete_not_scheduled']]}")
3 years ago
self.details["delete_not_scheduled"] = self._parse("delete_not_scheduled", self.data, datatype="bool", methods=methods, default=False)
if "schedule" in methods and not config.requested_collections:
logger.debug("")
logger.debug("Validating Method: schedule")
if not self.data[methods["schedule"]]:
3 years ago
raise Failed(f"{self.Type} Error: schedule attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['schedule']]}")
skip_collection = True
schedule_list = util.get_list(self.data[methods["schedule"]])
next_month = self.current_time.replace(day=28) + timedelta(days=4)
last_day = next_month - timedelta(days=next_month.day)
for schedule in schedule_list:
run_time = str(schedule).lower()
if run_time.startswith(("day", "daily")):
skip_collection = False
elif run_time == "never":
self.schedule += f"\nNever scheduled to run"
elif run_time.startswith(("hour", "week", "month", "year", "range")):
match = re.search("\\(([^)]+)\\)", run_time)
if not match:
3 years ago
logger.error(f"{self.Type} Error: failed to parse schedule: {schedule}")
continue
param = match.group(1)
if run_time.startswith("hour"):
try:
if 0 <= int(param) <= 23:
3 years ago
self.schedule += f"\nScheduled to run only on the {util.make_ordinal(int(param))} hour"
3 years ago
if self.config.run_hour == int(param):
skip_collection = False
else:
raise ValueError
except ValueError:
3 years ago
logger.error(f"{self.Type} Error: hourly schedule attribute {schedule} invalid must be an integer between 0 and 23")
elif run_time.startswith("week"):
if param.lower() not in util.days_alias:
3 years ago
logger.error(f"{self.Type} Error: weekly schedule attribute {schedule} invalid must be a day of the week i.e. weekly(Monday)")
continue
weekday = util.days_alias[param.lower()]
self.schedule += f"\nScheduled weekly on {util.pretty_days[weekday]}"
if weekday == self.current_time.weekday():
skip_collection = False
elif run_time.startswith("month"):
try:
if 1 <= int(param) <= 31:
3 years ago
self.schedule += f"\nScheduled monthly on the {util.make_ordinal(int(param))}"
if self.current_time.day == int(param) or (self.current_time.day == last_day.day and int(param) > last_day.day):
skip_collection = False
else:
raise ValueError
except ValueError:
3 years ago
logger.error(f"{self.Type} Error: monthly schedule attribute {schedule} invalid must be an integer between 1 and 31")
elif run_time.startswith("year"):
3 years ago
try:
if "/" in param:
opt = param.split("/")
month = int(opt[0])
day = int(opt[1])
self.schedule += f"\nScheduled yearly on {util.pretty_months[month]} {util.make_ordinal(day)}"
if self.current_time.month == month and (self.current_time.day == day or (self.current_time.day == last_day.day and day > last_day.day)):
skip_collection = False
else:
raise ValueError
except ValueError:
3 years ago
logger.error(f"{self.Type} Error: yearly schedule attribute {schedule} invalid must be in the MM/DD format i.e. yearly(11/22)")
elif run_time.startswith("range"):
match = re.match("^(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])-(1[0-2]|0?[1-9])/(3[01]|[12][0-9]|0?[1-9])$", param)
if not match:
3 years ago
logger.error(f"{self.Type} Error: range schedule attribute {schedule} invalid must be in the MM/DD-MM/DD format i.e. range(12/01-12/25)")
continue
3 years ago
def check_day(_m, _d):
if _m in [1, 3, 5, 7, 8, 10, 12] and _d > 31:
return _m, 31
elif _m in [4, 6, 9, 11] and _d > 30:
return _m, 30
elif _m == 2 and _d > 28:
return _m, 28
else:
return _m, _d
month_start, day_start = check_day(int(match.group(1)), int(match.group(2)))
month_end, day_end = check_day(int(match.group(3)), int(match.group(4)))
month_check, day_check = check_day(self.current_time.month, self.current_time.day)
check = datetime.strptime(f"{month_check}/{day_check}", "%m/%d")
start = datetime.strptime(f"{month_start}/{day_start}", "%m/%d")
end = datetime.strptime(f"{month_end}/{day_end}", "%m/%d")
self.schedule += f"\nScheduled between {util.pretty_months[month_start]} {util.make_ordinal(day_start)} and {util.pretty_months[month_end]} {util.make_ordinal(day_end)}"
if start <= check <= end if start < end else check <= end or check >= start:
skip_collection = False
else:
3 years ago
logger.error(f"{self.Type} Error: schedule attribute {schedule} invalid")
if len(self.schedule) == 0:
skip_collection = False
if skip_collection:
suffix = ""
if self.details["delete_not_scheduled"]:
try:
self.obj = self.library.get_collection(self.name)
self.delete_collection()
self.deleted = True
suffix = f" and was deleted"
except Failed:
suffix = f" and could not be found to delete"
raise NotScheduled(f"{self.schedule}\n\nCollection {self.name} not scheduled to run{suffix}")
self.collectionless = "plex_collectionless" in methods
self.validate_builders = True
if "validate_builders" in methods:
logger.debug("")
logger.debug("Validating Method: validate_builders")
logger.debug(f"Value: {data[methods['validate_builders']]}")
3 years ago
self.validate_builders = self._parse("validate_builders", self.data, datatype="bool", methods=methods, default=True)
self.run_again = False
if "run_again" in methods:
logger.debug("")
logger.debug("Validating Method: run_again")
logger.debug(f"Value: {data[methods['run_again']]}")
3 years ago
self.run_again = self._parse("run_again", self.data, datatype="bool", methods=methods, default=False)
self.build_collection = True
if "build_collection" in methods:
logger.debug("")
logger.debug("Validating Method: build_collection")
logger.debug(f"Value: {data[methods['build_collection']]}")
3 years ago
self.build_collection = self._parse("build_collection", self.data, datatype="bool", methods=methods, default=True)
self.sync = self.library.sync_mode == "sync"
if "sync_mode" in methods:
logger.debug("")
logger.debug("Validating Method: sync_mode")
if not self.data[methods["sync_mode"]]:
logger.warning(f"Collection Warning: sync_mode attribute is blank using general: {self.library.sync_mode}")
else:
logger.debug(f"Value: {self.data[methods['sync_mode']]}")
if self.data[methods["sync_mode"]].lower() not in ["append", "sync"]:
logger.warning(f"Collection Warning: {self.data[methods['sync_mode']]} sync_mode invalid using general: {self.library.sync_mode}")
else:
self.sync = self.data[methods["sync_mode"]].lower() == "sync"
self.custom_sort = False
if "collection_order" in methods:
logger.debug("")
logger.debug("Validating Method: collection_order")
if self.data[methods["collection_order"]] is None:
raise Failed(f"Collection Warning: collection_order attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['collection_order']]}")
if self.data[methods["collection_order"]].lower() in plex.collection_order_options:
self.details["collection_order"] = self.data[methods["collection_order"]].lower()
if self.data[methods["collection_order"]].lower() == "custom" and self.build_collection:
self.custom_sort = True
else:
3 years ago
raise Failed(f"{self.Type} Error: {self.data[methods['collection_order']]} collection_order invalid\n\trelease (Order Collection by release dates)\n\talpha (Order Collection Alphabetically)\n\tcustom (Custom Order Collection)")
self.sort_by = None
if "sort_by" in methods:
logger.debug("")
logger.debug("Validating Method: sort_by")
if self.data[methods["sort_by"]] is None:
raise Failed(f"{self.Type} Error: sort_by attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['sort_by']]}")
if (self.library.is_movie and self.data[methods["sort_by"]] not in plex.movie_sorts) or (self.library.is_show and self.data[methods["sort_by"]] in plex.show_sorts):
raise Failed(f"{self.Type} Error: sort_by attribute {self.data[methods['sort_by']]} invalid")
else:
self.sort_by = self.data[methods["sort_by"]]
self.collection_level = "movie" if self.library.is_movie else "show"
if "collection_level" in methods:
logger.debug("")
logger.debug("Validating Method: collection_level")
if self.library.is_movie:
3 years ago
raise Failed(f"{self.Type} Error: collection_level attribute only works for show libraries")
elif self.data[methods["collection_level"]] is None:
3 years ago
raise Failed(f"{self.Type} Error: collection_level attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['collection_level']]}")
if self.data[methods["collection_level"]].lower() in plex.collection_level_options:
self.collection_level = self.data[methods["collection_level"]].lower()
else:
3 years ago
raise Failed(f"{self.Type} Error: {self.data[methods['collection_level']]} collection_level invalid\n\tseason (Collection at the Season Level)\n\tepisode (Collection at the Episode Level)")
self.parts_collection = self.collection_level in ["season", "episode"]
self.media_type = self.collection_level.capitalize()
if "tmdb_person" in methods:
logger.debug("")
logger.debug("Validating Method: tmdb_person")
if not self.data[methods["tmdb_person"]]:
3 years ago
raise Failed(f"{self.Type} Error: tmdb_person attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['tmdb_person']]}")
valid_names = []
for tmdb_id in util.get_int_list(self.data[methods["tmdb_person"]], "TMDb Person ID"):
3 years ago
person = self.config.TMDb.get_person(tmdb_id)
valid_names.append(person.name)
if hasattr(person, "biography") and person.biography:
self.summaries["tmdb_person"] = person.biography
if hasattr(person, "profile_path") and person.profile_path:
3 years ago
self.posters["tmdb_person"] = f"{self.config.TMDb.image_url}{person.profile_path}"
if len(valid_names) > 0:
self.details["tmdb_person"] = valid_names
else:
3 years ago
raise Failed(f"{self.Type} Error: No valid TMDb Person IDs in {self.data[methods['tmdb_person']]}")
self.smart_sort = "random"
self.smart_label_collection = False
if "smart_label" in methods:
logger.debug("")
logger.debug("Validating Method: smart_label")
self.smart_label_collection = True
if not self.data[methods["smart_label"]]:
3 years ago
logger.warning(f"{self.Type} Error: smart_label attribute is blank defaulting to random")
else:
logger.debug(f"Value: {self.data[methods['smart_label']]}")
if (self.library.is_movie and str(self.data[methods["smart_label"]]).lower() in plex.movie_sorts) \
or (self.library.is_show and str(self.data[methods["smart_label"]]).lower() in plex.show_sorts):
self.smart_sort = str(self.data[methods["smart_label"]]).lower()
else:
3 years ago
logger.warning(f"{self.Type} Error: smart_label attribute: {self.data[methods['smart_label']]} is invalid defaulting to random")
self.smart_url = None
self.smart_type_key = None
self.smart_filter_details = ""
if "smart_url" in methods:
logger.debug("")
logger.debug("Validating Method: smart_url")
if not self.data[methods["smart_url"]]:
3 years ago
raise Failed(f"{self.Type} Error: smart_url attribute is blank")
else:
logger.debug(f"Value: {self.data[methods['smart_url']]}")
try:
self.smart_url, self.smart_type_key = self.library.get_smart_filter_from_uri(self.data[methods["smart_url"]])
except ValueError:
3 years ago
raise Failed(f"{self.Type} Error: smart_url is incorrectly formatted")
if "smart_filter" in methods:
self.smart_type_key, self.smart_filter_details, self.smart_url = self.build_filter("smart_filter", self.data[methods["smart_filter"]], smart=True)
def cant_interact(attr1, attr2, fail=False):
if getattr(self, attr1) and getattr(self, attr2):
3 years ago
message = f"{self.Type} Error: {attr1} & {attr2} attributes cannot go together"
if fail:
raise Failed(message)
else:
setattr(self, attr2, False)
logger.info("")
logger.warning(f"{message} removing {attr2}")
cant_interact("smart_label_collection", "collectionless")
cant_interact("smart_url", "collectionless")
cant_interact("smart_url", "run_again")
cant_interact("smart_label_collection", "smart_url", fail=True)
cant_interact("smart_label_collection", "parts_collection", fail=True)
cant_interact("smart_url", "parts_collection", fail=True)
cant_interact("smart_url", "sort_by")
cant_interact("smart_label_collection", "sort_by")
cant_interact("parts_collection", "sort_by")
self.smart = self.smart_url or self.smart_label_collection
for method_key, method_data in self.data.items():
method_name, method_mod, method_final = self._split(method_key)
if method_name in ignored_details:
continue
logger.debug("")
logger.debug(f"Validating Method: {method_key}")
logger.debug(f"Value: {method_data}")
try:
3 years ago
if method_data is None and method_name in all_builders + plex.searches: raise Failed(f"{self.Type} Error: {method_final} attribute is blank")
elif method_data is None and method_final not in none_details: logger.warning(f"Collection Warning: {method_final} attribute is blank")
3 years ago
elif not self.config.Trakt and "trakt" in method_name: raise Failed(f"{self.Type} Error: {method_final} requires Trakt to be configured")
elif not self.library.Radarr and "radarr" in method_name: raise Failed(f"{self.Type} Error: {method_final} requires Radarr to be configured")
elif not self.library.Sonarr and "sonarr" in method_name: raise Failed(f"{self.Type} Error: {method_final} requires Sonarr to be configured")
elif not self.library.Tautulli and "tautulli" in method_name: raise Failed(f"{self.Type} Error: {method_final} requires Tautulli to be configured")
elif not self.config.MyAnimeList and "mal" in method_name: raise Failed(f"{self.Type} Error: {method_final} requires MyAnimeList to be configured")
elif self.library.is_movie and method_name in show_only_builders: raise Failed(f"{self.Type} Error: {method_final} attribute only works for show libraries")
elif self.library.is_show and method_name in movie_only_builders: raise Failed(f"{self.Type} Error: {method_final} attribute only works for movie libraries")
elif self.library.is_show and method_name in plex.movie_only_searches: raise Failed(f"{self.Type} Error: {method_final} plex search only works for movie libraries")
elif self.library.is_movie and method_name in plex.show_only_searches: raise Failed(f"{self.Type} Error: {method_final} plex search only works for show libraries")
elif self.parts_collection and method_name not in parts_collection_valid: raise Failed(f"{self.Type} Error: {method_final} attribute does not work with Collection Level: {self.collection_level.capitalize()}")
elif self.smart and method_name in smart_invalid: raise Failed(f"{self.Type} Error: {method_final} attribute only works with normal collections")
elif self.collectionless and method_name not in collectionless_details: raise Failed(f"{self.Type} Error: {method_final} attribute does not work for Collectionless collection")
elif self.smart_url and method_name in all_builders + smart_url_invalid: raise Failed(f"{self.Type} Error: {method_final} builder not allowed when using smart_filter")
elif method_name in summary_details: self._summary(method_name, method_data)
elif method_name in poster_details: self._poster(method_name, method_data)
elif method_name in background_details: self._background(method_name, method_data)
elif method_name in details: self._details(method_name, method_data, method_final, methods)
elif method_name in item_details: self._item_details(method_name, method_data, method_mod, method_final, methods)
elif method_name in radarr_details: self._radarr(method_name, method_data)
elif method_name in sonarr_details: self._sonarr(method_name, method_data)
elif method_name in anidb.builders: self._anidb(method_name, method_data)
elif method_name in anilist.builders: self._anilist(method_name, method_data)
elif method_name in flixpatrol.builders: self._flixpatrol(method_name, method_data)
elif method_name in icheckmovies.builders: self._icheckmovies(method_name, method_data)
elif method_name in letterboxd.builders: self._letterboxd(method_name, method_data)
elif method_name in imdb.builders: self._imdb(method_name, method_data)
elif method_name in mal.builders: self._mal(method_name, method_data)
elif method_name in plex.builders or method_final in plex.searches: self._plex(method_name, method_data)
elif method_name in stevenlu.builders: self._stevenlu(method_name, method_data)
elif method_name in tautulli.builders: self._tautulli(method_name, method_data)
elif method_name in tmdb.builders: self._tmdb(method_name, method_data)
elif method_name in trakt.builders: self._trakt(method_name, method_data)
elif method_name in tvdb.builders: self._tvdb(method_name, method_data)
elif method_name == "filters": self._filters(method_name, method_data)
3 years ago
else: raise Failed(f"{self.Type} Error: {method_final} attribute not supported")
except Failed as e:
if self.validate_builders:
raise
else:
logger.error(e)
if self.custom_sort and len(self.builders) > 1:
3 years ago
raise Failed(f"{self.Type} Error: collection_order: custom can only be used with a single builder per collection")
if self.custom_sort and self.builders[0][0] not in custom_sort_builders:
3 years ago
raise Failed(f"{self.Type} Error: collection_order: custom cannot be used with {self.builders[0][0]}")
if "add" not in self.radarr_details:
self.radarr_details["add"] = self.library.Radarr.add if self.library.Radarr else False
if "add_existing" not in self.radarr_details:
self.radarr_details["add_existing"] = self.library.Radarr.add_existing if self.library.Radarr else False
if "add" not in self.sonarr_details:
self.sonarr_details["add"] = self.library.Sonarr.add if self.library.Sonarr else False
if "add_existing" not in self.sonarr_details:
self.sonarr_details["add_existing"] = self.library.Sonarr.add_existing if self.library.Sonarr else False
if self.smart_url or self.collectionless:
self.radarr_details["add"] = False
self.radarr_details["add_existing"] = False
self.sonarr_details["add"] = False
self.sonarr_details["add_existing"] = False
3 years ago
if self.radarr_details["add_existing"] or self.sonarr_details["add_existing"]:
self.item_details["add_existing"] = True
4 years ago
if self.collectionless:
self.details["collection_mode"] = "hide"
self.sync = True
self.do_missing = not self.no_missing and (self.details["show_missing"] or self.details["save_missing"]
or (self.library.Radarr and self.radarr_details["add"])
or (self.library.Sonarr and self.sonarr_details["add"]))
if self.build_collection:
try:
self.obj = self.library.get_collection(self.name)
if (self.smart and not self.obj.smart) or (not self.smart and self.obj.smart):
logger.info("")
3 years ago
logger.error(f"{self.Type} Error: Converting {self.obj.title} to a {'smart' if self.smart else 'normal'} collection")
self.library.query(self.obj.delete)
self.obj = None
except Failed:
self.obj = None
self.plex_map = {}
if self.sync and self.obj:
for item in self.library.get_collection_items(self.obj, self.smart_label_collection):
self.plex_map[item.ratingKey] = item
if self.obj:
self.exists = True
else:
self.obj = None
self.sync = False
self.run_again = False
logger.info("")
logger.info("Validation Successful")
3 years ago
def _parse(self, attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None, regex=None):
display = f"{parent + ' ' if parent else ''}{attribute} attribute"
if options is None and translation is not None:
options = [o for o in translation]
value = data[methods[attribute]] if methods and attribute in methods else data
if datatype == "list":
if value:
return [v for v in value if v] if isinstance(value, list) else [str(value)]
return []
elif datatype == "intlist":
if value:
try:
return [int(v) for v in value if v] if isinstance(value, list) else [int(value)]
except ValueError:
pass
return []
elif datatype == "dictlist":
final_list = []
for dict_data in util.get_list(value):
if isinstance(dict_data, dict):
final_list.append((dict_data, {dm.lower(): dm for dm in dict_data}))
else:
raise Failed(f"{self.Type} Error: {display} {dict_data} is not a dictionary")
return final_list
elif methods and attribute not in methods:
message = f"{display} not found"
elif value is None:
message = f"{display} is blank"
elif regex is not None:
regex_str, example = regex
if re.compile(regex_str).match(str(value)):
return str(value)
else:
message = f"{display}: {value} must match pattern {regex_str} e.g. {example}"
elif datatype == "bool":
if isinstance(value, bool):
return value
elif isinstance(value, int):
return value > 0
elif str(value).lower() in ["t", "true"]:
return True
elif str(value).lower() in ["f", "false"]:
return False
else:
message = f"{display} must be either true or false"
elif datatype in ["int", "float"]:
try:
value = int(str(value)) if datatype == "int" else float(str(value))
if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum):
return value
except ValueError:
pass
pre = f"{display} {value} must be {'an integer' if datatype == 'int' else 'a number'}"
if maximum is None:
message = f"{pre} {minimum} or greater"
else:
message = f"{pre} between {minimum} and {maximum}"
elif (translation is not None and str(value).lower() not in translation) or \
(options is not None and translation is None and str(value).lower() not in options):
message = f"{display} {value} must be in {', '.join([str(o) for o in options])}"
else:
return translation[value] if translation is not None else value
if default is None:
raise Failed(f"{self.Type} Error: {message}")
else:
logger.warning(f"{self.Type} Warning: {message} using {default} as default")
return translation[default] if translation is not None else default
3 years ago
def _summary(self, method_name, method_data):
if method_name == "summary":
self.summaries[method_name] = method_data
elif method_name == "tmdb_summary":
self.summaries[method_name] = self.config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, "TMDb ID"), self.library.is_movie).overview
elif method_name == "tmdb_description":
self.summaries[method_name] = self.config.TMDb.get_list(util.regex_first_int(method_data, "TMDb List ID")).description
elif method_name == "tmdb_biography":
self.summaries[method_name] = self.config.TMDb.get_person(util.regex_first_int(method_data, "TMDb Person ID")).biography
elif method_name == "tvdb_summary":
self.summaries[method_name] = self.config.TVDb.get_item(method_data, self.library.is_movie).summary
3 years ago
elif method_name == "tvdb_description":
self.summaries[method_name] = self.config.TVDb.get_list_description(method_data)
3 years ago
elif method_name == "trakt_description":
self.summaries[method_name] = self.config.Trakt.list_description(self.config.Trakt.validate_trakt(method_data, self.library.is_movie)[0])
elif method_name == "letterboxd_description":
self.summaries[method_name] = self.config.Letterboxd.get_list_description(method_data, self.language)
elif method_name == "icheckmovies_description":
self.summaries[method_name] = self.config.ICheckMovies.get_list_description(method_data, self.language)
def _poster(self, method_name, method_data):
if method_name == "url_poster":
self.posters[method_name] = method_data
elif method_name == "tmdb_poster":
url_slug = self.config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, 'TMDb ID'), self.library.is_movie).poster_path
self.posters[method_name] = f"{self.config.TMDb.image_url}{url_slug}"
elif method_name == "tmdb_profile":
url_slug = self.config.TMDb.get_person(util.regex_first_int(method_data, 'TMDb Person ID')).profile_path
self.posters[method_name] = f"{self.config.TMDb.image_url}{url_slug}"
elif method_name == "tvdb_poster":
self.posters[method_name] = f"{self.config.TVDb.get_item(method_data, self.library.is_movie).poster_path}"
3 years ago
elif method_name == "file_poster":
if os.path.exists(method_data):
self.posters[method_name] = os.path.abspath(method_data)
else:
3 years ago
raise Failed(f"{self.Type} Error: Poster Path Does Not Exist: {os.path.abspath(method_data)}")
3 years ago
def _background(self, method_name, method_data):
if method_name == "url_background":
self.backgrounds[method_name] = method_data
elif method_name == "tmdb_background":
url_slug = self.config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, 'TMDb ID'), self.library.is_movie).poster_path
self.backgrounds[method_name] = f"{self.config.TMDb.image_url}{url_slug}"
elif method_name == "tvdb_background":
self.posters[method_name] = f"{self.config.TVDb.get_item(method_data, self.library.is_movie).background_path}"
3 years ago
elif method_name == "file_background":
if os.path.exists(method_data):
self.backgrounds[method_name] = os.path.abspath(method_data)
else:
3 years ago
raise Failed(f"{self.Type} Error: Background Path Does Not Exist: {os.path.abspath(method_data)}")
3 years ago
def _details(self, method_name, method_data, method_final, methods):
if method_name == "collection_mode":
if str(method_data).lower() in plex.collection_mode_options:
self.details[method_name] = plex.collection_mode_options[str(method_data).lower()]
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_data} collection_mode invalid\n\tdefault (Library default)\n\thide (Hide Collection)\n\thide_items (Hide Items in this Collection)\n\tshow_items (Show this Collection and its Items)")
elif method_name == "collection_minimum":
3 years ago
self.minimum = self._parse(method_name, method_data, datatype="int", minimum=1)
elif method_name == "server_preroll":
3 years ago
self.server_preroll = self._parse(method_name, method_data)
elif method_name == "ignore_ids":
3 years ago
self.ignore_ids.extend(self._parse(method_name, method_data, datatype="intlist"))
elif method_name == "ignore_imdb_ids":
3 years ago
self.ignore_imdb_ids.extend(self._parse(method_name, method_data, datatype="list"))
3 years ago
elif method_name == "label":
if "label" in methods and "label.sync" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use label and label.sync together")
3 years ago
if "label.remove" in methods and "label.sync" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use label.remove and label.sync together")
3 years ago
if method_final == "label" and "label_sync_mode" in methods and self.data[methods["label_sync_mode"]] == "sync":
self.details["label.sync"] = util.get_list(method_data) if method_data else []
3 years ago
else:
self.details[method_final] = util.get_list(method_data) if method_data else []
elif method_name == "collection_changes_webhooks":
3 years ago
self.details[method_name] = self._parse(method_name, method_data, datatype="list")
3 years ago
elif method_name in boolean_details:
default = self.details[method_name] if method_name in self.details else None
3 years ago
self.details[method_name] = self._parse(method_name, method_data, datatype="bool", default=default)
3 years ago
elif method_name in string_details:
self.details[method_name] = str(method_data)
def _item_details(self, method_name, method_data, method_mod, method_final, methods):
if method_name == "item_label":
if "item_label" in methods and "item_label.sync" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use item_label and item_label.sync together")
3 years ago
if "item_label.remove" in methods and "item_label.sync" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use item_label.remove and item_label.sync together")
self.item_details[method_final] = util.get_list(method_data) if method_data else []
3 years ago
elif method_name in ["item_radarr_tag", "item_sonarr_tag"]:
if method_name in methods and f"{method_name}.sync" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use {method_name} and {method_name}.sync together")
3 years ago
if f"{method_name}.remove" in methods and f"{method_name}.sync" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use {method_name}.remove and {method_name}.sync together")
3 years ago
if method_name in methods and f"{method_name}.remove" in methods:
3 years ago
raise Failed(f"{self.Type} Error: Cannot use {method_name} and {method_name}.remove together")
3 years ago
self.item_details[method_name] = util.get_list(method_data)
self.item_details["apply_tags"] = method_mod[1:] if method_mod else ""
elif method_name == "item_overlay":
overlay = os.path.join(self.config.default_dir, "overlays", method_data, "overlay.png")
if not os.path.exists(overlay):
3 years ago
raise Failed(f"{self.Type} Error: {method_data} overlay image not found at {overlay}")
3 years ago
if method_data in self.library.overlays:
raise Failed("Each Overlay can only be used once per Library")
self.library.overlays.append(method_data)
self.item_details[method_name] = method_data
elif method_name in item_bool_details:
3 years ago
if self._parse(method_name, method_data, datatype="bool", default=False):
self.item_details[method_name] = True
3 years ago
elif method_name in plex.item_advance_keys:
key, options = plex.item_advance_keys[method_name]
if method_name in advance_new_agent and self.library.agent not in plex.new_plex_agents:
logger.error(
f"Metadata Error: {method_name} attribute only works for with the New Plex Movie Agent and New Plex TV Agent")
elif method_name in advance_show and not self.library.is_show:
logger.error(f"Metadata Error: {method_name} attribute only works for show libraries")
elif str(method_data).lower() not in options:
logger.error(f"Metadata Error: {method_data} {method_name} attribute invalid")
else:
self.item_details[method_name] = str(method_data).lower()
def _radarr(self, method_name, method_data):
if method_name in ["radarr_add", "radarr_add_existing", "radarr_monitor", "radarr_search"]:
3 years ago
self.radarr_details[method_name[7:]] = self._parse(method_name, method_data, datatype="bool")
3 years ago
elif method_name == "radarr_folder":
self.radarr_details["folder"] = method_data
3 years ago
elif method_name == "radarr_availability":
if str(method_data).lower() in radarr.availability_translation:
self.radarr_details["availability"] = str(method_data).lower()
3 years ago
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} attribute must be either announced, cinemas, released or db")
3 years ago
elif method_name == "radarr_quality":
self.radarr_details["quality"] = method_data
3 years ago
elif method_name == "radarr_tag":
self.radarr_details["tag"] = util.get_list(method_data)
3 years ago
def _sonarr(self, method_name, method_data):
if method_name in ["sonarr_add", "sonarr_add_existing", "sonarr_season", "sonarr_search", "sonarr_cutoff_search"]:
3 years ago
self.sonarr_details[method_name[7:]] = self._parse(method_name, method_data, datatype="bool")
3 years ago
elif method_name in ["sonarr_folder", "sonarr_quality", "sonarr_language"]:
self.sonarr_details[method_name[7:]] = method_data
3 years ago
elif method_name == "sonarr_monitor":
if str(method_data).lower() in sonarr.monitor_translation:
self.sonarr_details["monitor"] = str(method_data).lower()
3 years ago
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} attribute must be either all, future, missing, existing, pilot, first, latest or none")
3 years ago
elif method_name == "sonarr_series":
if str(method_data).lower() in sonarr.series_types:
self.sonarr_details["series"] = str(method_data).lower()
3 years ago
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} attribute must be either standard, daily, or anime")
3 years ago
elif method_name == "sonarr_tag":
self.sonarr_details["tag"] = util.get_list(method_data)
3 years ago
3 years ago
def _anidb(self, method_name, method_data):
if method_name == "anidb_popular":
3 years ago
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=30, maximum=30)))
3 years ago
elif method_name in ["anidb_id", "anidb_relation"]:
for anidb_id in self.config.AniDB.validate_anidb_ids(method_data, self.language):
self.builders.append((method_name, anidb_id))
3 years ago
elif method_name == "anidb_tag":
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
3 years ago
new_dictionary = {}
if "tag" not in dict_methods:
3 years ago
raise Failed(f"{self.Type} Error: anidb_tag tag attribute is required")
3 years ago
elif not dict_data[dict_methods["tag"]]:
3 years ago
raise Failed(f"{self.Type} Error: anidb_tag tag attribute is blank")
3 years ago
else:
new_dictionary["tag"] = util.regex_first_int(dict_data[dict_methods["tag"]], "AniDB Tag ID")
3 years ago
new_dictionary["limit"] = self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, minimum=0)
self.builders.append((method_name, new_dictionary))
3 years ago
def _anilist(self, method_name, method_data):
if method_name in ["anilist_id", "anilist_relations", "anilist_studio"]:
for anilist_id in self.config.AniList.validate_anilist_ids(method_data, studio=method_name == "anilist_studio"):
self.builders.append((method_name, anilist_id))
elif method_name in ["anilist_popular", "anilist_trending", "anilist_top_rated"]:
3 years ago
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10)))
elif method_name == "anilist_search":
if self.current_time.month in [12, 1, 2]: current_season = "winter"
elif self.current_time.month in [3, 4, 5]: current_season = "spring"
elif self.current_time.month in [6, 7, 8]: current_season = "summer"
else: current_season = "fall"
default_year = self.current_year + 1 if self.current_time.month == 12 else self.current_year
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
3 years ago
new_dictionary = {}
for search_method, search_data in dict_data.items():
search_attr, modifier, search_final = self._split(search_method)
if search_final not in anilist.searches:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {search_final} attribute not supported")
elif search_attr == "season":
3 years ago
new_dictionary[search_attr] = self._parse(search_attr, search_data, parent=method_name, default=current_season, options=util.seasons)
if "year" not in dict_methods:
logger.warning(f"Collection Warning: {method_name} year attribute not found using this year: {default_year} by default")
new_dictionary["year"] = default_year
elif search_attr == "year":
3 years ago
new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="int", parent=method_name, default=default_year, minimum=1917, maximum=default_year + 1)
elif search_data is None:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {search_final} attribute is blank")
elif search_attr == "adult":
3 years ago
new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="bool", parent=method_name)
elif search_attr == "country":
3 years ago
new_dictionary[search_attr] = self._parse(search_attr, search_data, options=anilist.country_codes, parent=method_name)
elif search_attr == "source":
3 years ago
new_dictionary[search_attr] = self._parse(search_attr, search_data, options=anilist.media_source, parent=method_name)
elif search_attr in ["episodes", "duration", "score", "popularity"]:
3 years ago
new_dictionary[search_final] = self._parse(search_final, search_data, datatype="int", parent=method_name)
elif search_attr in ["format", "status", "genre", "tag", "tag_category"]:
3 years ago
new_dictionary[search_final] = self.config.AniList.validate(search_attr.replace("_", " ").title(), self._parse(search_final, search_data))
elif search_attr in ["start", "end"]:
new_dictionary[search_final] = util.validate_date(search_data, f"{method_name} {search_final} attribute", return_as="%m/%d/%Y")
elif search_attr == "min_tag_percent":
3 years ago
new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="int", parent=method_name, minimum=0, maximum=100)
elif search_attr == "search":
new_dictionary[search_attr] = str(search_data)
elif search_final not in ["sort_by", "limit"]:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {search_final} attribute not supported")
if len(new_dictionary) == 0:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} must have at least one valid search option")
new_dictionary["sort_by"] = self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=anilist.sort_options)
new_dictionary["limit"] = self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
self.builders.append((method_name, new_dictionary))
3 years ago
def _flixpatrol(self, method_name, method_data):
if method_name.startswith("flixpatrol_url"):
flixpatrol_lists = self.config.FlixPatrol.validate_flixpatrol_lists(method_data, self.language, self.library.is_movie)
for flixpatrol_list in flixpatrol_lists:
self.builders.append(("flixpatrol_url", flixpatrol_list))
elif method_name in flixpatrol.builders:
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
if method_name == "flixpatrol_demographics":
data = {
3 years ago
"generation": self._parse("generation", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.generations),
"gender": self._parse("gender", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.gender),
"location": self._parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.demo_locations),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
elif method_name == "flixpatrol_popular":
data = {
3 years ago
"source": self._parse("source", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.popular),
"time_window": self._parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
elif method_name == "flixpatrol_top":
data = {
3 years ago
"platform": self._parse("platform", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.platforms),
"location": self._parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.locations),
"time_window": self._parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
else:
continue
if self.config.FlixPatrol.validate_flixpatrol_dict(method_name, data, self.language, self.library.is_movie):
self.builders.append((method_name, data))
3 years ago
def _icheckmovies(self, method_name, method_data):
if method_name.startswith("icheckmovies_list"):
icheckmovies_lists = self.config.ICheckMovies.validate_icheckmovies_lists(method_data, self.language)
for icheckmovies_list in icheckmovies_lists:
self.builders.append(("icheckmovies_list", icheckmovies_list))
3 years ago
if method_name.endswith("_details"):
self.summaries[method_name] = self.config.ICheckMovies.get_list_description(icheckmovies_lists[0], self.language)
def _imdb(self, method_name, method_data):
if method_name == "imdb_id":
for value in util.get_list(method_data):
if str(value).startswith("tt"):
self.builders.append((method_name, value))
else:
3 years ago
raise Failed(f"{self.Type} Error: imdb_id {value} must begin with tt")
elif method_name == "imdb_list":
for imdb_dict in self.config.IMDb.validate_imdb_lists(method_data, self.language):
self.builders.append((method_name, imdb_dict))
elif method_name == "imdb_chart":
for value in util.get_list(method_data):
if value in imdb.movie_charts and not self.library.is_movie:
3 years ago
raise Failed(f"{self.Type} Error: chart: {value} does not work with show libraries")
elif value in imdb.show_charts and self.library.is_movie:
3 years ago
raise Failed(f"{self.Type} Error: chart: {value} does not work with movie libraries")
elif value in imdb.charts:
self.builders.append((method_name, value))
3 years ago
else:
3 years ago
raise Failed(f"{self.Type} Error: chart: {value} is invalid options are {[i for i in imdb.charts]}")
def _letterboxd(self, method_name, method_data):
if method_name.startswith("letterboxd_list"):
letterboxd_lists = self.config.Letterboxd.validate_letterboxd_lists(method_data, self.language)
for letterboxd_list in letterboxd_lists:
self.builders.append(("letterboxd_list", letterboxd_list))
if method_name.endswith("_details"):
self.summaries[method_name] = self.config.Letterboxd.get_list_description(letterboxd_lists[0], self.language)
def _mal(self, method_name, method_data):
if method_name == "mal_id":
for mal_id in util.get_int_list(method_data, "MyAnimeList ID"):
self.builders.append((method_name, mal_id))
elif method_name in ["mal_all", "mal_airing", "mal_upcoming", "mal_tv", "mal_ova", "mal_movie", "mal_special", "mal_popular", "mal_favorite", "mal_suggested"]:
3 years ago
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10, maximum=100 if method_name == "mal_suggested" else 500)))
elif method_name in ["mal_season", "mal_userlist"]:
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
if method_name == "mal_season":
if self.current_time.month in [1, 2, 3]: default_season = "winter"
elif self.current_time.month in [4, 5, 6]: default_season = "spring"
elif self.current_time.month in [7, 8, 9]: default_season = "summer"
else: default_season = "fall"
self.builders.append((method_name, {
3 years ago
"season": self._parse("season", dict_data, methods=dict_methods, parent=method_name, default=default_season, options=util.seasons),
"sort_by": self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation),
"year": self._parse("year", dict_data, datatype="int", methods=dict_methods, default=self.current_year, parent=method_name, minimum=1917, maximum=self.current_year + 1),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=500)
}))
elif method_name == "mal_userlist":
self.builders.append((method_name, {
3 years ago
"username": self._parse("username", dict_data, methods=dict_methods, parent=method_name),
"status": self._parse("status", dict_data, methods=dict_methods, parent=method_name, default="all", options=mal.userlist_status),
"sort_by": self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=1000)
}))
elif method_name in ["mal_genre", "mal_studio"]:
id_name = f"{method_name[4:]}_id"
final_data = []
for data in util.get_list(method_data):
final_data.append(data if isinstance(data, dict) else {id_name: data, "limit": 0})
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
self.builders.append((method_name, {
3 years ago
id_name: self._parse(id_name, dict_data, datatype="int", methods=dict_methods, parent=method_name, maximum=999999),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
}))
def _plex(self, method_name, method_data):
if method_name == "plex_all":
self.builders.append((method_name, True))
elif method_name in ["plex_search", "plex_collectionless"]:
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if method_name == "plex_search":
type_override = f"{self.collection_level}s" if self.collection_level in plex.collection_level_options else None
new_dictionary = self.build_filter("plex_search", dict_data, type_override=type_override)
elif method_name == "plex_collectionless":
3 years ago
prefix_list = self._parse("exclude_prefix", dict_data, datatype="list", methods=dict_methods)
exact_list = self._parse("exclude", dict_data, datatype="list", methods=dict_methods)
if len(prefix_list) == 0 and len(exact_list) == 0:
3 years ago
raise Failed(f"{self.Type} Error: you must have at least one exclusion")
exact_list.append(self.name)
new_dictionary["exclude_prefix"] = prefix_list
new_dictionary["exclude"] = exact_list
self.builders.append((method_name, new_dictionary))
else:
self.builders.append(("plex_search", self.build_filter("plex_search", {"any": {method_name: method_data}})))
def _stevenlu(self, method_name, method_data):
3 years ago
self.builders.append((method_name, self._parse(method_name, method_data, "bool")))
def _tautulli(self, method_name, method_data):
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
self.builders.append((method_name, {
"list_type": "popular" if method_name == "tautulli_popular" else "watched",
3 years ago
"list_days": self._parse("list_days", dict_data, datatype="int", methods=dict_methods, default=30, parent=method_name),
"list_size": self._parse("list_size", dict_data, datatype="int", methods=dict_methods, default=10, parent=method_name),
"list_buffer": self._parse("list_buffer", dict_data, datatype="int", methods=dict_methods, default=20, parent=method_name),
"list_minimum": self._parse("list_minimum", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
}))
def _tmdb(self, method_name, method_data):
if method_name == "tmdb_discover":
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name)}
for discover_method, discover_data in dict_data.items():
discover_attr, modifier, discover_final = self._split(discover_method)
if discover_data is None:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute is blank")
elif discover_final not in tmdb.discover_all:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute not supported")
elif self.library.is_movie and discover_attr in tmdb.discover_tv_only:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute only works for show libraries")
elif self.library.is_show and discover_attr in tmdb.discover_movie_only:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute only works for movie libraries")
elif discover_attr in ["language", "region"]:
regex = ("([a-z]{2})-([A-Z]{2})", "en-US") if discover_attr == "language" else ("^[A-Z]{2}$", "US")
3 years ago
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, parent=method_name, regex=regex)
elif discover_attr == "sort_by" and self.library.is_movie:
options = tmdb.discover_movie_sort if self.library.is_movie else tmdb.discover_tv_sort
3 years ago
new_dictionary[discover_final] = self._parse(discover_attr, discover_data, parent=method_name, options=options)
elif discover_attr == "certification_country":
if "certification" in dict_data or "certification.lte" in dict_data or "certification.gte" in dict_data:
new_dictionary[discover_final] = discover_data
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_attr} attribute: must be used with either certification, certification.lte, or certification.gte")
elif discover_attr == "certification":
if "certification_country" in dict_data:
new_dictionary[discover_final] = discover_data
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute: must be used with certification_country")
elif discover_attr == "watch_region":
if "with_watch_providers" in dict_data or "without_watch_providers" in dict_data or "with_watch_monetization_types" in dict_data:
new_dictionary[discover_final] = discover_data
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute: must be used with either with_watch_providers, without_watch_providers, or with_watch_monetization_types")
elif discover_attr == "with_watch_monetization_types":
if "watch_region" in dict_data:
3 years ago
new_dictionary[discover_final] = self._parse(discover_attr, discover_data, parent=method_name, options=tmdb.discover_monetization_types)
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute: must be used with watch_region")
elif discover_attr in tmdb.discover_booleans:
3 years ago
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="bool", parent=method_name)
elif discover_attr == "vote_average":
3 years ago
new_dictionary[discover_final] = self._parse(discover_final, discover_data, datatype="float", parent=method_name)
elif discover_attr == "with_status":
3 years ago
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=5)
elif discover_attr == "with_type":
3 years ago
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=6)
elif discover_final in tmdb.discover_dates:
new_dictionary[discover_final] = util.validate_date(discover_data, f"{method_name} {discover_final} attribute", return_as="%m/%d/%Y")
elif discover_attr in tmdb.discover_years:
3 years ago
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=1800, maximum=self.current_year + 1)
elif discover_attr in tmdb.discover_ints:
3 years ago
new_dictionary[discover_final] = self._parse(discover_final, discover_data, datatype="int", parent=method_name)
elif discover_final in tmdb.discover_strings:
new_dictionary[discover_final] = discover_data
elif discover_attr != "limit":
3 years ago
raise Failed(f"{self.Type} Error: {method_name} {discover_final} attribute not supported")
if len(new_dictionary) > 1:
self.builders.append((method_name, new_dictionary))
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} had no valid fields")
elif method_name in ["tmdb_popular", "tmdb_top_rated", "tmdb_now_playing", "tmdb_trending_daily", "tmdb_trending_weekly"]:
3 years ago
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10)))
else:
values = self.config.TMDb.validate_tmdb_ids(method_data, method_name)
if method_name.endswith("_details"):
3 years ago
if method_name.startswith(("tmdb_collection", "tmdb_movie", "tmdb_show")):
item = self.config.TMDb.get_movie_show_or_collection(values[0], self.library.is_movie)
if hasattr(item, "overview") and item.overview:
self.summaries[method_name] = item.overview
if hasattr(item, "backdrop_path") and item.backdrop_path:
self.backgrounds[method_name] = f"{self.config.TMDb.image_url}{item.backdrop_path}"
if hasattr(item, "poster_path") and item.poster_path:
self.posters[method_name] = f"{self.config.TMDb.image_url}{item.poster_path}"
3 years ago
elif method_name.startswith(("tmdb_actor", "tmdb_crew", "tmdb_director", "tmdb_producer", "tmdb_writer")):
item = self.config.TMDb.get_person(values[0])
if hasattr(item, "biography") and item.biography:
self.summaries[method_name] = item.biography
if hasattr(item, "profile_path") and item.profile_path:
self.posters[method_name] = f"{self.config.TMDb.image_url}{item.profile_path}"
3 years ago
elif method_name.startswith("tmdb_list"):
item = self.config.TMDb.get_list(values[0])
if hasattr(item, "description") and item.description:
self.summaries[method_name] = item.description
for value in values:
self.builders.append((method_name[:-8] if method_name.endswith("_details") else method_name, value))
def _trakt(self, method_name, method_data):
if method_name.startswith("trakt_list"):
trakt_lists = self.config.Trakt.validate_trakt(method_data, self.library.is_movie)
for trakt_list in trakt_lists:
self.builders.append(("trakt_list", trakt_list))
if method_name.endswith("_details"):
self.summaries[method_name] = self.config.Trakt.list_description(trakt_lists[0])
elif method_name in ["trakt_watchlist", "trakt_collection"]:
for trakt_list in self.config.Trakt.validate_trakt(method_data, self.library.is_movie, trakt_type=method_name[6:]):
self.builders.append((method_name, trakt_list))
elif method_name == "trakt_boxoffice":
3 years ago
if self._parse(method_name, method_data, datatype="bool", default=False):
self.builders.append((method_name, 10))
else:
3 years ago
raise Failed(f"{self.Type} Error: {method_name} must be set to true")
elif method_name in trakt.builders:
3 years ago
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10)))
def _tvdb(self, method_name, method_data):
values = util.get_list(method_data)
if method_name.endswith("_details"):
if method_name.startswith(("tvdb_movie", "tvdb_show")):
item = self.config.TVDb.get_item(values[0], method_name.startswith("tvdb_movie"))
if hasattr(item, "description") and item.description:
self.summaries[method_name] = item.description
if hasattr(item, "background_path") and item.background_path:
self.backgrounds[method_name] = f"{self.config.TMDb.image_url}{item.background_path}"
if hasattr(item, "poster_path") and item.poster_path:
self.posters[method_name] = f"{self.config.TMDb.image_url}{item.poster_path}"
elif method_name.startswith("tvdb_list"):
self.summaries[method_name] = self.config.TVDb.get_list_description(values[0])
for value in values:
self.builders.append((method_name[:-8] if method_name.endswith("_details") else method_name, value))
def _filters(self, method_name, method_data):
3 years ago
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
validate = True
if "validate" in dict_data:
if dict_data["validate"] is None:
3 years ago
raise Failed(f"{self.Type} Error: validate filter attribute is blank")
if not isinstance(dict_data["validate"], bool):
3 years ago
raise Failed(f"{self.Type} Error: validate filter attribute must be either true or false")
validate = dict_data["validate"]
for filter_method, filter_data in dict_data.items():
filter_attr, modifier, filter_final = self._split(filter_method)
message = None
if filter_final not in all_filters:
3 years ago
message = f"{self.Type} Error: {filter_final} is not a valid filter attribute"
elif filter_final in movie_only_filters and self.library.is_show:
3 years ago
message = f"{self.Type} Error: {filter_final} filter attribute only works for movie libraries"
elif filter_final in show_only_filters and self.library.is_movie:
3 years ago
message = f"{self.Type} Error: {filter_final} filter attribute only works for show libraries"
elif filter_final is None:
3 years ago
message = f"{self.Type} Error: {filter_final} filter attribute is blank"
3 years ago
elif filter_attr in tmdb_filters:
self.tmdb_filters.append((filter_final, self.validate_attribute(filter_attr, modifier, f"{filter_final} filter", filter_data, validate)))
else:
self.filters.append((filter_final, self.validate_attribute(filter_attr, modifier, f"{filter_final} filter", filter_data, validate)))
if message:
if validate:
raise Failed(message)
else:
logger.error(message)
def find_rating_keys(self):
for method, value in self.builders:
3 years ago
ids = []
rating_keys = []
3 years ago
logger.debug("")
logger.debug(f"Builder: {method}: {value}")
logger.info("")
3 years ago
if "plex" in method:
rating_keys = self.library.get_rating_keys(method, value)
elif "tautulli" in method:
rating_keys = self.library.Tautulli.get_rating_keys(self.library, value)
elif "anidb" in method:
anidb_ids = self.config.AniDB.get_anidb_ids(method, value, self.language)
ids = self.config.Convert.anidb_to_ids(anidb_ids, self.library)
3 years ago
elif "anilist" in method:
anilist_ids = self.config.AniList.get_anilist_ids(method, value)
ids = self.config.Convert.anilist_to_ids(anilist_ids, self.library)
3 years ago
elif "mal" in method:
mal_ids = self.config.MyAnimeList.get_mal_ids(method, value)
ids = self.config.Convert.myanimelist_to_ids(mal_ids, self.library)
3 years ago
elif "tvdb" in method:
ids = self.config.TVDb.get_tvdb_ids(method, value)
3 years ago
elif "imdb" in method:
ids = self.config.IMDb.get_imdb_ids(method, value, self.language)
elif "flixpatrol" in method:
ids = self.config.FlixPatrol.get_flixpatrol_ids(method, value, self.language, self.library.is_movie)
3 years ago
elif "icheckmovies" in method:
ids = self.config.ICheckMovies.get_icheckmovies_ids(method, value, self.language)
elif "letterboxd" in method:
ids = self.config.Letterboxd.get_tmdb_ids(method, value, self.language)
elif "stevenlu" in method:
ids = self.config.StevenLu.get_stevenlu_ids(method)
elif "tmdb" in method:
ids = self.config.TMDb.get_tmdb_ids(method, value, self.library.is_movie)
elif "trakt" in method:
ids = self.config.Trakt.get_trakt_ids(method, value, self.library.is_movie)
else:
3 years ago
logger.error(f"{self.Type} Error: {method} method not supported")
3 years ago
if len(ids) > 0:
logger.debug("")
logger.debug(f"{len(ids)} IDs Found: {ids}")
total_ids = len(ids)
if total_ids > 0:
for i, input_data in enumerate(ids, 1):
input_id, id_type = input_data
util.print_return(f"Parsing ID {i}/{total_ids}")
if id_type == "ratingKey":
rating_keys.append(input_id)
elif id_type == "tmdb" and not self.parts_collection:
if input_id not in self.ignore_ids:
if input_id in self.library.movie_map:
rating_keys.extend(self.library.movie_map[input_id])
elif input_id not in self.missing_movies:
self.missing_movies.append(input_id)
elif id_type in ["tvdb", "tmdb_show"] and not self.parts_collection:
3 years ago
if id_type == "tmdb_show":
try:
input_id = self.config.Convert.tmdb_to_tvdb(input_id, fail=True)
except Failed as e:
logger.error(e)
continue
if input_id not in self.ignore_ids:
if input_id in self.library.show_map:
rating_keys.extend(self.library.show_map[input_id])
elif input_id not in self.missing_shows:
self.missing_shows.append(input_id)
elif id_type == "imdb" and not self.parts_collection:
if input_id not in self.ignore_imdb_ids:
if input_id in self.library.imdb_map:
rating_keys.extend(self.library.imdb_map[input_id])
else:
if self.do_missing:
try:
tmdb_id, tmdb_type = self.config.Convert.imdb_to_tmdb(input_id, fail=True)
if tmdb_type == "movie":
if tmdb_id not in self.missing_movies:
self.missing_movies.append(tmdb_id)
else:
tvdb_id = self.config.Convert.tmdb_to_tvdb(tmdb_id, fail=True)
if tvdb_id not in self.missing_shows:
self.missing_shows.append(tvdb_id)
except Failed as e:
logger.error(e)
continue
elif id_type == "tvdb_season" and self.collection_level == "season":
show_id, season_num = input_id.split("_")
show_id = int(show_id)
if show_id in self.library.show_map:
show_item = self.library.fetchItem(self.library.show_map[show_id][0])
try:
episode_item = show_item.season(season=int(season_num))
rating_keys.append(episode_item.ratingKey)
except NotFound:
self.missing_parts.append(f"{show_item.title} Season: {season_num} Missing")
elif show_id not in self.missing_shows:
self.missing_shows.append(show_id)
elif id_type == "tvdb_episode" and self.collection_level == "episode":
show_id, season_num, episode_num = input_id.split("_")
show_id = int(show_id)
if show_id in self.library.show_map:
show_item = self.library.fetchItem(self.library.show_map[show_id][0])
try:
episode_item = show_item.episode(season=int(season_num), episode=int(episode_num))
rating_keys.append(episode_item.ratingKey)
except NotFound:
self.missing_parts.append(f"{show_item.title} Season: {season_num} Episode: {episode_num} Missing")
elif show_id not in self.missing_shows:
self.missing_shows.append(show_id)
3 years ago
util.print_end()
3 years ago
3 years ago
if len(rating_keys) > 0:
name = self.obj.title if self.obj else self.name
if not isinstance(rating_keys, list):
rating_keys = [rating_keys]
total = len(rating_keys)
max_length = len(str(total))
if (self.filters or self.tmdb_filters) and self.details["show_filtered"] is True:
3 years ago
logger.info("")
logger.info("Filtering Builder:")
for i, key in enumerate(rating_keys, 1):
if key not in self.rating_keys:
if key in self.filtered_keys:
if self.details["show_filtered"] is True:
logger.info(f"{name} Collection | X | {self.filtered_keys[key]}")
else:
try:
current = self.fetch_item(key)
except Failed as e:
logger.error(e)
continue
current_title = self.item_title(current)
3 years ago
if self.check_filters(current, f"{(' ' * (max_length - len(str(i))))}{i}/{total}"):
self.rating_keys.append(key)
else:
self.filtered_keys[key] = current_title
if self.details["show_filtered"] is True:
logger.info(f"{name} Collection | X | {current_title}")
def build_filter(self, method, plex_filter, smart=False, type_override=None):
if smart:
logger.info("")
logger.info(f"Validating Method: {method}")
if plex_filter is None:
3 years ago
raise Failed(f"{self.Type} Error: {method} attribute is blank")
if not isinstance(plex_filter, dict):
3 years ago
raise Failed(f"{self.Type} Error: {method} must be a dictionary: {plex_filter}")
if smart:
logger.debug(f"Value: {plex_filter}")
filter_alias = {m.lower(): m for m in plex_filter}
if "any" in filter_alias and "all" in filter_alias:
3 years ago
raise Failed(f"{self.Type} Error: Cannot have more then one base")
if type_override:
sort_type = type_override
elif smart and "type" in filter_alias and self.library.is_show:
if plex_filter[filter_alias["type"]] not in ["shows", "seasons", "episodes"]:
3 years ago
raise Failed(f"{self.Type} Error: type: {plex_filter[filter_alias['type']]} is invalid, must be either shows, season, or episodes")
sort_type = plex_filter[filter_alias["type"]]
elif self.library.is_show:
sort_type = "shows"
else:
sort_type = "movies"
ms = method.split("_")
filter_details = f"{ms[0].capitalize()} {sort_type.capitalize()[:-1]} {ms[1].capitalize()}\n"
type_key, sorts = plex.sort_types[sort_type]
sort = "random" if smart else "title.asc"
if "sort_by" in filter_alias:
if plex_filter[filter_alias["sort_by"]] is None:
3 years ago
raise Failed(f"{self.Type} Error: sort_by attribute is blank")
if plex_filter[filter_alias["sort_by"]] not in sorts:
3 years ago
raise Failed(f"{self.Type} Error: sort_by: {plex_filter[filter_alias['sort_by']]} is invalid")
sort = plex_filter[filter_alias["sort_by"]]
filter_details += f"Sort By: {sort}\n"
limit = None
if "limit" in filter_alias:
if plex_filter[filter_alias["limit"]] is None:
3 years ago
raise Failed(f"{self.Type} Error: limit attribute is blank")
if not isinstance(plex_filter[filter_alias["limit"]], int) or plex_filter[filter_alias["limit"]] < 1:
3 years ago
raise Failed(f"{self.Type} Error: limit attribute must be an integer greater then 0")
limit = plex_filter[filter_alias["limit"]]
filter_details += f"Limit: {limit}\n"
validate = True
if "validate" in filter_alias:
if plex_filter[filter_alias["validate"]] is None:
3 years ago
raise Failed(f"{self.Type} Error: validate attribute is blank")
if not isinstance(plex_filter[filter_alias["validate"]], bool):
3 years ago
raise Failed(f"{self.Type} Error: validate attribute must be either true or false")
validate = plex_filter[filter_alias["validate"]]
filter_details += f"Validate: {validate}\n"
def _filter(filter_dict, is_all=True, level=1):
output = ""
display = f"\n{' ' * level}Match {'all' if is_all else 'any'} of the following:"
level += 1
indent = f"\n{' ' * level}"
conjunction = f"{'and' if is_all else 'or'}=1&"
for _key, _data in filter_dict.items():
attr, modifier, final_attr = self._split(_key)
def build_url_arg(arg, mod=None, arg_s=None, mod_s=None):
arg_key = plex.search_translation[attr] if attr in plex.search_translation else attr
3 years ago
arg_key = plex.show_translation[arg_key] if self.library.is_show and arg_key in plex.show_translation else arg_key
if mod is None:
mod = plex.modifier_translation[modifier] if modifier in plex.modifier_translation else modifier
if arg_s is None:
arg_s = arg
if attr in string_filters and modifier in ["", ".not"]:
mod_s = "does not contain" if modifier == ".not" else "contains"
elif mod_s is None:
mod_s = util.mod_displays[modifier]
param_s = plex.search_display[attr] if attr in plex.search_display else attr.title().replace('_', ' ')
display_line = f"{indent}{param_s} {mod_s} {arg_s}"
return f"{arg_key}{mod}={arg}&", display_line
if final_attr not in plex.searches and not final_attr.startswith(("any", "all")):
3 years ago
raise Failed(f"{self.Type} Error: {final_attr} is not a valid {method} attribute")
elif final_attr in plex.movie_only_searches and self.library.is_show:
3 years ago
raise Failed(f"{self.Type} Error: {final_attr} {method} attribute only works for movie libraries")
elif final_attr in plex.show_only_searches and self.library.is_movie:
3 years ago
raise Failed(f"{self.Type} Error: {final_attr} {method} attribute only works for show libraries")
elif _data is None:
3 years ago
raise Failed(f"{self.Type} Error: {final_attr} {method} attribute is blank")
elif final_attr.startswith(("any", "all")):
dicts = util.get_list(_data)
results = ""
display_add = ""
for dict_data in dicts:
if not isinstance(dict_data, dict):
3 years ago
raise Failed(f"{self.Type} Error: {attr} must be either a dictionary or list of dictionaries")
inside_filter, inside_display = _filter(dict_data, is_all=attr == "all", level=level)
if len(inside_filter) > 0:
display_add += inside_display
results += f"{conjunction if len(results) > 0 else ''}push=1&{inside_filter}pop=1&"
else:
validation = self.validate_attribute(attr, modifier, final_attr, _data, validate, pairs=True)
if validation is None:
continue
elif attr in plex.date_attributes and modifier in ["", ".not"]:
last_text = "is not in the last" if modifier == ".not" else "is in the last"
last_mod = "%3E%3E" if modifier == "" else "%3C%3C"
results, display_add = build_url_arg(f"-{validation}d", mod=last_mod, arg_s=f"{validation} Days", mod_s=last_text)
elif attr == "duration" and modifier in [".gt", ".gte", ".lt", ".lte"]:
results, display_add = build_url_arg(validation * 60000)
elif attr in plex.boolean_attributes:
bool_mod = "" if validation else "!"
bool_arg = "true" if validation else "false"
results, display_add = build_url_arg(1, mod=bool_mod, arg_s=bool_arg, mod_s="is")
elif (attr in ["title", "episode_title", "studio", "decade", "year", "episode_year"] or attr in plex.tags) and modifier in ["", ".is", ".isnot", ".not", ".begins", ".ends"]:
results = ""
display_add = ""
for og_value, result in validation:
built_arg = build_url_arg(quote(str(result)) if attr in string_filters else result, arg_s=og_value)
display_add += built_arg[1]
results += f"{conjunction if len(results) > 0 else ''}{built_arg[0]}"
else:
results, display_add = build_url_arg(validation)
display += display_add
output += f"{conjunction if len(output) > 0 else ''}{results}"
return output, display
if "any" not in filter_alias and "all" not in filter_alias:
base_dict = {}
any_dicts = []
for alias_key, alias_value in filter_alias.items():
_, _, final = self._split(alias_key)
if final in plex.and_searches:
base_dict[alias_value[:-4]] = plex_filter[alias_value]
elif final in plex.or_searches:
any_dicts.append({alias_value: plex_filter[alias_value]})
elif final in plex.searches:
base_dict[alias_value] = plex_filter[alias_value]
if len(any_dicts) > 0:
base_dict["any"] = any_dicts
base_all = True
if len(base_dict) == 0:
3 years ago
raise Failed(f"{self.Type} Error: Must have either any or all as a base for {method}")
else:
base = "all" if "all" in filter_alias else "any"
base_all = base == "all"
if plex_filter[filter_alias[base]] is None:
3 years ago
raise Failed(f"{self.Type} Error: {base} attribute is blank")
if not isinstance(plex_filter[filter_alias[base]], dict):
3 years ago
raise Failed(f"{self.Type} Error: {base} must be a dictionary: {plex_filter[filter_alias[base]]}")
base_dict = plex_filter[filter_alias[base]]
built_filter, filter_text = _filter(base_dict, is_all=base_all)
filter_details = f"{filter_details}Filter:{filter_text}"
if len(built_filter) > 0:
final_filter = built_filter[:-1] if base_all else f"push=1&{built_filter}pop=1"
filter_url = f"?type={type_key}&{f'limit={limit}&' if limit else ''}sort={sorts[sort]}&{final_filter}"
else:
3 years ago
raise Failed(f"{self.Type} Error: No Filter Created")
return type_key, filter_details, filter_url
def validate_attribute(self, attribute, modifier, final, data, validate, pairs=False):
def smart_pair(list_to_pair):
return [(t, t) for t in list_to_pair] if pairs else list_to_pair
if modifier == ".regex":
regex_list = util.get_list(data, split=False)
valid_regex = []
for reg in regex_list:
try:
re.compile(reg)
valid_regex.append(reg)
except re.error:
util.print_stacktrace()
3 years ago
err = f"{self.Type} Error: Regular Expression Invalid: {reg}"
if validate:
raise Failed(err)
else:
logger.error(err)
return valid_regex
elif attribute in ["title", "studio", "episode_title", "audio_track_title"] and modifier in ["", ".not", ".is", ".isnot", ".begins", ".ends"]:
return smart_pair(util.get_list(data, split=False))
3 years ago
elif attribute == "original_language":
return util.get_list(data, lower=True)
elif attribute == "filepath":
return util.get_list(data)
elif attribute == "history":
try:
3 years ago
return self._parse(final, data, datatype="int", maximum=30)
except Failed:
if str(data).lower() in ["day", "month"]:
return data.lower()
3 years ago
raise Failed(f"{self.Type} Error: history attribute invalid: {data} must be a number between 1-30, day, or month")
3 years ago
elif attribute in plex.tags and modifier in ["", ".not"]:
3 years ago
if attribute in plex.tmdb_attributes:
final_values = []
for value in util.get_list(data):
if value.lower() == "tmdb" and "tmdb_person" in self.details:
for name in self.details["tmdb_person"]:
final_values.append(name)
else:
final_values.append(value)
else:
final_values = util.get_list(data)
search_choices = self.library.get_search_choices(attribute, title=not pairs)
valid_list = []
for value in final_values:
if str(value).lower() in search_choices:
if pairs:
valid_list.append((value, search_choices[str(value).lower()]))
else:
valid_list.append(search_choices[str(value).lower()])
else:
error = f"Plex Error: {attribute}: {value} not found"
if validate:
raise Failed(error)
else:
logger.error(error)
return valid_list
3 years ago
elif attribute in ["year", "episode_year", "tmdb_year"] and modifier in [".gt", ".gte", ".lt", ".lte"]:
3 years ago
return self._parse(final, data, datatype="int", minimum=1800, maximum=self.current_year)
3 years ago
elif attribute in plex.date_attributes and modifier in [".before", ".after"]:
return util.validate_date(data, final, return_as="%Y-%m-%d")
3 years ago
elif attribute in plex.number_attributes and modifier in ["", ".not", ".gt", ".gte", ".lt", ".lte"]:
3 years ago
return self._parse(final, data, datatype="int")
3 years ago
elif attribute in plex.float_attributes and modifier in [".gt", ".gte", ".lt", ".lte"]:
3 years ago
return self._parse(final, data, datatype="float", minimum=0, maximum=10)
3 years ago
elif attribute in ["decade", "year", "episode_year", "tmdb_year"] and modifier in ["", ".not"]:
final_years = []
values = util.get_list(data)
for value in values:
3 years ago
final_years.append(self._parse(final, value, datatype="int", minimum=1800, maximum=self.current_year))
return smart_pair(final_years)
elif attribute in plex.boolean_attributes + ["has_collection"]:
3 years ago
return self._parse(attribute, data, datatype="bool")
else:
3 years ago
raise Failed(f"{self.Type} Error: {final} attribute not supported")
def _split(self, text):
attribute, modifier = os.path.splitext(str(text).lower())
attribute = method_alias[attribute] if attribute in method_alias else attribute
modifier = modifier_alias[modifier] if modifier in modifier_alias else modifier
if attribute == "add_to_arr":
attribute = "radarr_add" if self.library.is_movie else "sonarr_add"
elif attribute in ["arr_tag", "arr_folder"]:
attribute = f"{'rad' if self.library.is_movie else 'son'}{attribute}"
3 years ago
elif attribute in plex.date_attributes and modifier in [".gt", ".gte"]:
modifier = ".after"
elif attribute in plex.date_attributes and modifier in [".lt", ".lte"]:
modifier = ".before"
final = f"{attribute}{modifier}"
if text != final:
logger.warning(f"Collection Warning: {text} attribute will run as {final}")
return attribute, modifier, final
def fetch_item(self, item):
try:
current = self.library.fetchItem(item.ratingKey if isinstance(item, (Movie, Show, Season, Episode)) else int(item))
if not isinstance(current, (Movie, Show, Season, Episode)):
raise NotFound
return current
except (BadRequest, NotFound):
raise Failed(f"Plex Error: Item {item} not found")
def add_to_collection(self):
name, collection_items = self.library.get_collection_name_and_items(self.obj if self.obj else self.name, self.smart_label_collection)
total = len(self.rating_keys)
3 years ago
amount_added = 0
for i, item in enumerate(self.rating_keys, 1):
try:
current = self.fetch_item(item)
except Failed as e:
logger.error(e)
continue
current_operation = "=" if current in collection_items else "+"
logger.info(util.adjust_space(f"{name} Collection | {current_operation} | {self.item_title(current)}"))
if current in collection_items:
self.plex_map[current.ratingKey] = None
else:
self.library.alter_collection(current, name, smart_label_collection=self.smart_label_collection)
3 years ago
amount_added += 1
if self.details["collection_changes_webhooks"]:
if self.library.is_movie and current.ratingKey in self.library.movie_rating_key_map:
add_id = self.library.movie_rating_key_map[current.ratingKey]
elif self.library.is_show and current.ratingKey in self.library.show_rating_key_map:
add_id = self.library.show_rating_key_map[current.ratingKey]
else:
add_id = None
3 years ago
self.notification_additions.append({"title": current.title, "id": add_id})
util.print_end()
logger.info("")
logger.info(f"{total} {self.collection_level.capitalize()}{'s' if total > 1 else ''} Processed")
3 years ago
return amount_added
def sync_collection(self):
3 years ago
amount_removed = 0
for ratingKey, item in self.plex_map.items():
if item is not None:
3 years ago
if amount_removed == 0:
logger.info("")
util.separator(f"Removed from {self.name} Collection", space=False, border=False)
logger.info("")
self.library.reload(item)
logger.info(f"{self.name} Collection | - | {self.item_title(item)}")
self.library.alter_collection(item, self.name, smart_label_collection=self.smart_label_collection, add=False)
if self.details["collection_changes_webhooks"]:
if self.library.is_movie and item.ratingKey in self.library.movie_rating_key_map:
remove_id = self.library.movie_rating_key_map[item.ratingKey]
elif self.library.is_show and item.ratingKey in self.library.show_rating_key_map:
remove_id = self.library.show_rating_key_map[item.ratingKey]
else:
remove_id = None
3 years ago
self.notification_removals.append({"title": item.title, "id": remove_id})
amount_removed += 1
if amount_removed > 0:
logger.info("")
3 years ago
logger.info(f"{amount_removed} {self.collection_level.capitalize()}{'s' if amount_removed == 1 else ''} Removed")
return amount_removed
3 years ago
def check_tmdb_filter(self, item_id, is_movie, item=None, check_released=False):
if self.tmdb_filters or check_released:
try:
if item is None:
item = self.config.TMDb.get_movie(item_id) if is_movie else self.config.TMDb.get_show(self.config.Convert.tvdb_to_tmdb(item_id))
if check_released:
if util.validate_date(item.release_date if is_movie else item.first_air_date, "") > self.current_time:
return False
for filter_method, filter_data in self.tmdb_filters:
filter_attr, modifier, filter_final = self._split(filter_method)
if filter_attr == "original_language":
if (modifier == ".not" and item.original_language in filter_data) \
or (modifier == "" and item.original_language not in filter_data):
return False
elif filter_attr in ["first_episode_aired", "last_episode_aired"]:
tmdb_date = None
if filter_attr == "first_episode_aired":
tmdb_date = util.validate_date(item.first_air_date, "TMDB First Air Date")
elif filter_attr == "last_episode_aired":
tmdb_date = util.validate_date(item.last_air_date, "TMDB Last Air Date")
if util.is_date_filter(tmdb_date, modifier, filter_data, filter_final, self.current_time):
return False
elif modifier in [".gt", ".gte", ".lt", ".lte"]:
attr = None
if filter_attr == "tmdb_vote_count":
attr = item.vote_count
3 years ago
elif filter_attr == "tmdb_year" and is_movie:
3 years ago
attr = item.year
3 years ago
elif filter_attr == "tmdb_year" and not is_movie:
3 years ago
air_date = item.first_air_date
if air_date:
3 years ago
attr = util.validate_date(air_date, "TMDb Year Filter").year
3 years ago
if util.is_number_filter(attr, modifier, filter_data):
return False
except Failed:
return False
return True
def check_filters(self, current, display):
if (self.filters or self.tmdb_filters) and not self.details["only_filter_missing"]:
util.print_return(f"Filtering {display} {current.title}")
3 years ago
if self.tmdb_filters:
if current.ratingKey not in self.library.movie_rating_key_map and current.ratingKey not in self.library.show_rating_key_map:
logger.warning(f"Filter Error: No {'TMDb' if self.library.is_movie else 'TVDb'} ID found for {current.title}")
return False
try:
if current.ratingKey in self.library.movie_rating_key_map:
t_id = self.library.movie_rating_key_map[current.ratingKey]
else:
t_id = self.library.show_rating_key_map[current.ratingKey]
except Failed as e:
logger.error(e)
return False
if not self.check_tmdb_filter(t_id, current.ratingKey in self.library.movie_rating_key_map):
return False
for filter_method, filter_data in self.filters:
3 years ago
filter_attr, modifier, filter_final = self._split(filter_method)
filter_actual = filter_translation[filter_attr] if filter_attr in filter_translation else filter_attr
3 years ago
if filter_attr in ["release", "added", "last_played"]:
3 years ago
if util.is_date_filter(getattr(current, filter_actual), modifier, filter_data, filter_final, self.current_time):
return False
3 years ago
elif filter_attr in ["audio_track_title", "filepath", "title", "studio"]:
values = []
if filter_attr == "audio_track_title":
for media in current.media:
for part in media.parts:
values.extend([a.title for a in part.audioStreams() if a.title])
elif filter_attr == "filepath":
values = [loc for loc in current.locations]
elif filter_attr in ["title", "studio"]:
values = [getattr(current, filter_actual)]
if util.is_string_filter(values, modifier, filter_data):
return False
elif filter_attr == "has_collection":
if util.is_boolean_filter(filter_data, len(current.collections) > 0):
return False
elif filter_attr == "history":
item_date = current.originallyAvailableAt
if item_date is None:
return False
elif filter_data == "day":
if item_date.month != self.current_time.month or item_date.day != self.current_time.day:
return False
elif filter_data == "month":
if item_date.month != self.current_time.month:
return False
else:
date_match = False
for i in range(filter_data):
check_date = self.current_time - timedelta(days=i)
if item_date.month == check_date.month and item_date.day == check_date.day:
date_match = True
if date_match is False:
return False
3 years ago
elif modifier in [".gt", ".gte", ".lt", ".lte"]:
3 years ago
divider = 60000 if filter_attr == "duration" else 1
if util.is_number_filter(getattr(current, filter_actual) / divider, modifier, filter_data):
return False
else:
attrs = []
3 years ago
if filter_attr in ["resolution", "audio_language", "subtitle_language"]:
for media in current.media:
3 years ago
if filter_attr == "resolution":
attrs.extend([media.videoResolution])
for part in media.parts:
3 years ago
if filter_attr == "audio_language":
attrs.extend([a.language for a in part.audioStreams()])
3 years ago
if filter_attr == "subtitle_language":
attrs.extend([s.language for s in part.subtitleStreams()])
3 years ago
elif filter_attr in ["content_rating", "year", "rating"]:
attrs = [str(getattr(current, filter_actual))]
elif filter_attr in ["actor", "country", "director", "genre", "label", "producer", "writer", "collection"]:
attrs = [attr.tag for attr in getattr(current, filter_actual)]
else:
3 years ago
raise Failed(f"Filter Error: filter: {filter_final} not supported")
3 years ago
if (not list(set(filter_data) & set(attrs)) and modifier == "") \
or (list(set(filter_data) & set(attrs)) and modifier == ".not"):
return False
util.print_return(f"Filtering {display} {current.title}")
return True
def run_missing(self):
3 years ago
added_to_radarr = 0
added_to_sonarr = 0
if len(self.missing_movies) > 0:
missing_movies_with_names = []
for missing_id in self.missing_movies:
try:
movie = self.config.TMDb.get_movie(missing_id)
except Failed as e:
logger.error(e)
continue
current_title = f"{movie.title} ({util.validate_date(movie.release_date, 'test').year})" if movie.release_date else movie.title
if self.check_tmdb_filter(missing_id, True, item=movie, check_released=self.details["missing_only_released"]):
missing_movies_with_names.append((current_title, missing_id))
if self.details["show_missing"] is True:
logger.info(f"{self.name} Collection | ? | {current_title} (TMDb: {missing_id})")
else:
if self.details["show_filtered"] is True and self.details["show_missing"] is True:
logger.info(f"{self.name} Collection | X | {current_title} (TMDb: {missing_id})")
logger.info("")
logger.info(f"{len(missing_movies_with_names)} Movie{'s' if len(missing_movies_with_names) > 1 else ''} Missing")
if len(missing_movies_with_names) > 0:
if self.details["save_missing"] is True:
self.library.add_missing(self.name, missing_movies_with_names, True)
if self.run_again or (self.library.Radarr and (self.radarr_details["add"] or "item_radarr_tag" in self.item_details)):
missing_tmdb_ids = [missing_id for title, missing_id in missing_movies_with_names]
if self.library.Radarr:
if self.radarr_details["add"]:
try:
3 years ago
added_to_radarr += self.library.Radarr.add_tmdb(missing_tmdb_ids, **self.radarr_details)
except Failed as e:
logger.error(e)
if "item_radarr_tag" in self.item_details:
try:
self.library.Radarr.edit_tags(missing_tmdb_ids, self.item_details["item_radarr_tag"], self.item_details["apply_tags"])
except Failed as e:
logger.error(e)
if self.run_again:
self.run_again_movies.extend(missing_tmdb_ids)
if len(self.missing_shows) > 0 and self.library.is_show:
missing_shows_with_names = []
for missing_id in self.missing_shows:
try:
show = self.config.TVDb.get_series(missing_id)
except Failed as e:
logger.error(e)
continue
if self.check_tmdb_filter(missing_id, False, check_released=self.details["missing_only_released"]):
missing_shows_with_names.append((show.title, missing_id))
if self.details["show_missing"] is True:
logger.info(f"{self.name} Collection | ? | {show.title} (TVDB: {missing_id})")
else:
if self.details["show_filtered"] is True and self.details["show_missing"] is True:
logger.info(f"{self.name} Collection | X | {show.title} (TVDb: {missing_id})")
logger.info("")
logger.info(f"{len(missing_shows_with_names)} Show{'s' if len(missing_shows_with_names) > 1 else ''} Missing")
if len(missing_shows_with_names) > 0:
if self.details["save_missing"] is True:
self.library.add_missing(self.name, missing_shows_with_names, False)
if self.run_again or (self.library.Sonarr and (self.sonarr_details["add"] or "item_sonarr_tag" in self.item_details)):
missing_tvdb_ids = [missing_id for title, missing_id in missing_shows_with_names]
if self.library.Sonarr:
if self.sonarr_details["add"]:
try:
3 years ago
added_to_sonarr += self.library.Sonarr.add_tvdb(missing_tvdb_ids, **self.sonarr_details)
except Failed as e:
logger.error(e)
if "item_sonarr_tag" in self.item_details:
try:
self.library.Sonarr.edit_tags(missing_tvdb_ids, self.item_details["item_sonarr_tag"], self.item_details["apply_tags"])
except Failed as e:
logger.error(e)
if self.run_again:
self.run_again_shows.extend(missing_tvdb_ids)
if len(self.missing_parts) > 0 and self.library.is_show and self.details["save_missing"] is True:
for missing in self.missing_parts:
logger.info(f"{self.name} Collection | X | {missing}")
3 years ago
return added_to_radarr, added_to_sonarr
def item_title(self, item):
if self.collection_level == "season":
if f"Season {item.index}" == item.title:
return f"{item.parentTitle} {item.title}"
else:
return f"{item.parentTitle} Season {item.index}: {item.title}"
elif self.collection_level == "episode":
text = f"{item.grandparentTitle} S{util.add_zero(item.parentIndex)}E{util.add_zero(item.index)}"
if f"Season {item.parentIndex}" == item.parentTitle:
return f"{text}: {item.title}"
else:
return f"{text}: {item.parentTitle}: {item.title}"
elif self.collection_level == "movie" and item.year:
return f"{item.title} ({item.year})"
else:
return item.title
def load_collection_items(self):
if self.build_collection and self.obj:
self.items = self.library.get_collection_items(self.obj, self.smart_label_collection)
elif not self.build_collection:
3 years ago
logger.info("")
util.separator(f"Items Found for {self.name} Collection", space=False, border=False)
logger.info("")
for rk in self.rating_keys:
try:
3 years ago
item = self.fetch_item(rk)
logger.info(f"{item.title} (Rating Key: {rk})")
self.items.append(item)
except Failed as e:
logger.error(e)
if not self.items:
raise Failed(f"Plex Error: No Collection items found")
def update_item_details(self):
3 years ago
logger.info("")
util.separator(f"Updating Details of the Items in {self.name} Collection", space=False, border=False)
logger.info("")
overlay = None
overlay_folder = None
overlay_name = ""
rating_keys = []
if "item_overlay" in self.item_details:
overlay_name = self.item_details["item_overlay"]
3 years ago
if self.config.Cache:
cache_keys = self.config.Cache.query_image_map_overlay(self.library.image_table_name, overlay_name)
if cache_keys:
for rating_key in cache_keys:
try:
item = self.fetch_item(rating_key)
except Failed as e:
logger.error(e)
continue
if isinstance(item, (Movie, Show)):
self.library.edit_tags("label", item, add_tags=[f"{overlay_name} Overlay"])
self.config.Cache.update_remove_overlay(self.library.image_table_name, overlay_name)
rating_keys = [int(item.ratingKey) for item in self.library.get_labeled_items(f"{overlay_name} Overlay")]
overlay_folder = os.path.join(self.config.default_dir, "overlays", overlay_name)
overlay_image = Image.open(os.path.join(overlay_folder, "overlay.png")).convert("RGBA")
temp_image = os.path.join(overlay_folder, f"temp.png")
overlay = (overlay_name, overlay_folder, overlay_image, temp_image)
revert = "revert_overlay" in self.item_details
if revert:
overlay = None
add_tags = self.item_details["item_label"] if "item_label" in self.item_details else None
remove_tags = self.item_details["item_label.remove"] if "item_label.remove" in self.item_details else None
sync_tags = self.item_details["item_label.sync"] if "item_label.sync" in self.item_details else None
tmdb_paths = []
tvdb_paths = []
for item in self.items:
if int(item.ratingKey) in rating_keys and not revert:
rating_keys.remove(int(item.ratingKey))
if "item_assets" in self.item_details or overlay is not None:
try:
self.library.update_item_from_assets(item, overlay=overlay)
except Failed as e:
logger.error(e)
self.library.edit_tags("label", item, add_tags=add_tags, remove_tags=remove_tags, sync_tags=sync_tags)
path = os.path.dirname(str(item.locations[0])) if self.library.is_movie else str(item.locations[0])
3 years ago
if self.library.Radarr and item.ratingKey in self.library.movie_rating_key_map:
path = path.replace(self.library.Radarr.plex_path, self.library.Radarr.radarr_path)
path = path[:-1] if path.endswith(('/', '\\')) else path
tmdb_paths.append((self.library.movie_rating_key_map[item.ratingKey], path))
3 years ago
if self.library.Sonarr and item.ratingKey in self.library.show_rating_key_map:
path = path.replace(self.library.Sonarr.plex_path, self.library.Sonarr.sonarr_path)
path = path[:-1] if path.endswith(('/', '\\')) else path
tvdb_paths.append((self.library.show_rating_key_map[item.ratingKey], path))
advance_edits = {}
for method_name, method_data in self.item_details.items():
if method_name in plex.item_advance_keys:
key, options = plex.item_advance_keys[method_name]
if getattr(item, key) != options[method_data]:
advance_edits[key] = options[method_data]
self.library.edit_item(item, item.title, self.collection_level.capitalize(), advance_edits, advanced=True)
if "item_tmdb_season_titles" in self.item_details and item.ratingKey in self.library.show_rating_key_map:
try:
tmdb_id = self.config.Convert.tvdb_to_tmdb(self.library.show_rating_key_map[item.ratingKey])
names = {str(s.season_number): s.name for s in self.config.TMDb.get_show(tmdb_id).seasons}
for season in self.library.query(item.seasons):
if str(season.index) in names:
self.library.edit_query(season, {"title.locked": 1, "title.value": names[str(season.index)]})
except Failed as e:
logger.error(e)
# Locking should come before refreshing since refreshing can change metadata (i.e. if specified to both lock
# background/poster and also refreshing, assume that the current background/poster should be kept)
if "item_lock_background" in self.item_details:
self.library.query(item.lockArt)
if "item_lock_poster" in self.item_details:
self.library.query(item.lockPoster)
if "item_lock_title" in self.item_details:
self.library.edit_query(item, {"title.locked": 1})
if "item_refresh" in self.item_details:
self.library.query(item.refresh)
3 years ago
if self.library.Radarr and tmdb_paths:
if "item_radarr_tag" in self.item_details:
3 years ago
self.library.Radarr.edit_tags([t[0] if isinstance(t, tuple) else t for t in tmdb_paths], self.item_details["item_radarr_tag"], self.item_details["apply_tags"])
if self.radarr_details["add_existing"]:
3 years ago
self.library.Radarr.add_tmdb(tmdb_paths, **self.radarr_details)
3 years ago
if self.library.Sonarr and tvdb_paths:
if "item_sonarr_tag" in self.item_details:
3 years ago
self.library.Sonarr.edit_tags([t[0] if isinstance(t, tuple) else t for t in tvdb_paths], self.item_details["item_sonarr_tag"], self.item_details["apply_tags"])
if self.sonarr_details["add_existing"]:
3 years ago
self.library.Sonarr.add_tvdb(tvdb_paths, **self.sonarr_details)
for rating_key in rating_keys:
try:
item = self.fetch_item(rating_key)
except Failed as e:
logger.error(e)
continue
self.library.edit_tags("label", item, remove_tags=[f"{overlay_name} Overlay"])
og_image = os.path.join(overlay_folder, f"{rating_key}.png")
if os.path.exists(og_image):
self.library.upload_file_poster(item, og_image)
os.remove(og_image)
self.config.Cache.update_image_map(item.ratingKey, self.library.image_table_name, "", "")
def delete_collection(self):
if self.obj:
self.library.query(self.obj.delete)
def load_collection(self):
if not self.obj and self.smart_url:
self.library.create_smart_collection(self.name, self.smart_type_key, self.smart_url)
elif self.smart_label_collection:
4 years ago
try:
smart_type, self.smart_url = self.library.smart_label_url(self.name, self.smart_sort)
if not self.obj:
self.library.create_smart_collection(self.name, smart_type, self.smart_url)
4 years ago
except Failed:
3 years ago
raise Failed(f"{self.Type} Error: Label: {self.name} was not added to any items in the Library")
self.obj = self.library.get_collection(self.name)
if not self.exists:
self.created = True
def update_details(self):
logger.info("")
util.separator(f"Updating Details of {self.name} Collection", space=False, border=False)
logger.info("")
if self.smart_url and self.smart_url != self.library.smart_filter(self.obj):
self.library.update_smart_collection(self.obj, self.smart_url)
logger.info(f"Detail: Smart Filter updated to {self.smart_url}")
edits = {}
def get_summary(summary_method, summaries):
logger.info(f"Detail: {summary_method} updated Collection Summary")
return summaries[summary_method]
if "summary" in self.summaries: summary = get_summary("summary", self.summaries)
elif "tmdb_description" in self.summaries: summary = get_summary("tmdb_description", self.summaries)
elif "letterboxd_description" in self.summaries: summary = get_summary("letterboxd_description", self.summaries)
elif "tmdb_summary" in self.summaries: summary = get_summary("tmdb_summary", self.summaries)
elif "tvdb_summary" in self.summaries: summary = get_summary("tvdb_summary", self.summaries)
elif "tmdb_biography" in self.summaries: summary = get_summary("tmdb_biography", self.summaries)
elif "tmdb_person" in self.summaries: summary = get_summary("tmdb_person", self.summaries)
elif "tmdb_collection_details" in self.summaries: summary = get_summary("tmdb_collection_details", self.summaries)
elif "trakt_list_details" in self.summaries: summary = get_summary("trakt_list_details", self.summaries)
elif "tmdb_list_details" in self.summaries: summary = get_summary("tmdb_list_details", self.summaries)
elif "letterboxd_list_details" in self.summaries: summary = get_summary("letterboxd_list_details", self.summaries)
elif "icheckmovies_list_details" in self.summaries: summary = get_summary("icheckmovies_list_details", self.summaries)
elif "tmdb_actor_details" in self.summaries: summary = get_summary("tmdb_actor_details", self.summaries)
elif "tmdb_crew_details" in self.summaries: summary = get_summary("tmdb_crew_details", self.summaries)
elif "tmdb_director_details" in self.summaries: summary = get_summary("tmdb_director_details", self.summaries)
elif "tmdb_producer_details" in self.summaries: summary = get_summary("tmdb_producer_details", self.summaries)
elif "tmdb_writer_details" in self.summaries: summary = get_summary("tmdb_writer_details", self.summaries)
elif "tmdb_movie_details" in self.summaries: summary = get_summary("tmdb_movie_details", self.summaries)
elif "tvdb_movie_details" in self.summaries: summary = get_summary("tvdb_movie_details", self.summaries)
elif "tvdb_show_details" in self.summaries: summary = get_summary("tvdb_show_details", self.summaries)
elif "tmdb_show_details" in self.summaries: summary = get_summary("tmdb_show_details", self.summaries)
else: summary = None
if summary:
if str(summary) != str(self.obj.summary):
edits["summary.value"] = summary
edits["summary.locked"] = 1
if "sort_title" in self.details:
if str(self.details["sort_title"]) != str(self.obj.titleSort):
edits["titleSort.value"] = self.details["sort_title"]
edits["titleSort.locked"] = 1
logger.info(f"Detail: sort_title updated Collection Sort Title to {self.details['sort_title']}")
if "content_rating" in self.details:
if str(self.details["content_rating"]) != str(self.obj.contentRating):
edits["contentRating.value"] = self.details["content_rating"]
edits["contentRating.locked"] = 1
logger.info(f"Detail: content_rating updated Collection Content Rating to {self.details['content_rating']}")
if "collection_mode" in self.details:
if int(self.obj.collectionMode) not in plex.collection_mode_keys\
or plex.collection_mode_keys[int(self.obj.collectionMode)] != self.details["collection_mode"]:
self.library.collection_mode_query(self.obj, self.details["collection_mode"])
logger.info(f"Detail: collection_mode updated Collection Mode to {self.details['collection_mode']}")
if "collection_order" in self.details:
if int(self.obj.collectionSort) not in plex.collection_order_keys\
or plex.collection_order_keys[int(self.obj.collectionSort)] != self.details["collection_order"]:
self.library.collection_order_query(self.obj, self.details["collection_order"])
logger.info(f"Detail: collection_order updated Collection Order to {self.details['collection_order']}")
if "visible_library" in self.details or "visible_home" in self.details or "visible_shared" in self.details:
visibility = self.library.collection_visibility(self.obj)
visible_library = None
visible_home = None
visible_shared = None
if "visible_library" in self.details and self.details["visible_library"] != visibility["library"]:
visible_library = self.details["visible_library"]
if "visible_home" in self.details and self.details["visible_home"] != visibility["library"]:
visible_home = self.details["visible_home"]
if "visible_shared" in self.details and self.details["visible_shared"] != visibility["library"]:
visible_shared = self.details["visible_shared"]
if visible_library is not None or visible_home is not None or visible_shared is not None:
self.library.collection_visibility_update(self.obj, visibility=visibility, library=visible_library, home=visible_home, shared=visible_shared)
logger.info("Detail: Collection visibility updated")
add_tags = self.details["label"] if "label" in self.details else None
remove_tags = self.details["label.remove"] if "label.remove" in self.details else None
sync_tags = self.details["label.sync"] if "label.sync" in self.details else None
self.library.edit_tags("label", self.obj, add_tags=add_tags, remove_tags=remove_tags, sync_tags=sync_tags)
if len(edits) > 0:
logger.debug(edits)
self.library.edit_query(self.obj, edits)
logger.info("Details: have been updated")
if self.library.asset_directory:
name_mapping = self.name
if "name_mapping" in self.details:
if self.details["name_mapping"]: name_mapping = self.details["name_mapping"]
3 years ago
else: logger.error(f"{self.Type} Error: name_mapping attribute is blank")
poster_image, background_image = self.library.find_collection_assets(self.obj, name=name_mapping)
if poster_image:
self.posters["asset_directory"] = poster_image
if background_image:
self.backgrounds["asset_directory"] = background_image
self.collection_poster = None
3 years ago
if len(self.posters) > 0:
logger.debug(f"{len(self.posters)} posters found:")
for p in self.posters:
3 years ago
logger.debug(f"Method: {p} Poster: {self.posters[p]}")
if "url_poster" in self.posters: self.collection_poster = ImageData("url_poster", self.posters["url_poster"])
elif "file_poster" in self.posters: self.collection_poster = ImageData("file_poster", self.posters["file_poster"], is_url=False)
elif "tmdb_poster" in self.posters: self.collection_poster = ImageData("tmdb_poster", self.posters["tmdb_poster"])
elif "tmdb_profile" in self.posters: self.collection_poster = ImageData("tmdb_poster", self.posters["tmdb_profile"])
elif "tvdb_poster" in self.posters: self.collection_poster = ImageData("tvdb_poster", self.posters["tvdb_poster"])
elif "asset_directory" in self.posters: self.collection_poster = self.posters["asset_directory"]
elif "tmdb_person" in self.posters: self.collection_poster = ImageData("tmdb_person", self.posters["tmdb_person"])
elif "tmdb_collection_details" in self.posters: self.collection_poster = ImageData("tmdb_collection_details", self.posters["tmdb_collection_details"])
elif "tmdb_actor_details" in self.posters: self.collection_poster = ImageData("tmdb_actor_details", self.posters["tmdb_actor_details"])
elif "tmdb_crew_details" in self.posters: self.collection_poster = ImageData("tmdb_crew_details", self.posters["tmdb_crew_details"])
elif "tmdb_director_details" in self.posters: self.collection_poster = ImageData("tmdb_director_details", self.posters["tmdb_director_details"])
elif "tmdb_producer_details" in self.posters: self.collection_poster = ImageData("tmdb_producer_details", self.posters["tmdb_producer_details"])
elif "tmdb_writer_details" in self.posters: self.collection_poster = ImageData("tmdb_writer_details", self.posters["tmdb_writer_details"])
elif "tmdb_movie_details" in self.posters: self.collection_poster = ImageData("tmdb_movie_details", self.posters["tmdb_movie_details"])
elif "tvdb_movie_details" in self.posters: self.collection_poster = ImageData("tvdb_movie_details", self.posters["tvdb_movie_details"])
elif "tvdb_show_details" in self.posters: self.collection_poster = ImageData("tvdb_show_details", self.posters["tvdb_show_details"])
elif "tmdb_show_details" in self.posters: self.collection_poster = ImageData("tmdb_show_details", self.posters["tmdb_show_details"])
else:
logger.info("No poster collection detail or asset folder found")
self.collection_background = None
3 years ago
if len(self.backgrounds) > 0:
logger.debug(f"{len(self.backgrounds)} backgrounds found:")
for b in self.backgrounds:
3 years ago
logger.debug(f"Method: {b} Background: {self.backgrounds[b]}")
if "url_background" in self.backgrounds: self.collection_background = ImageData("url_background", self.backgrounds["url_background"], is_poster=False)
elif "file_background" in self.backgrounds: self.collection_background = ImageData("file_background", self.backgrounds["file_background"], is_poster=False, is_url=False)
elif "tmdb_background" in self.backgrounds: self.collection_background = ImageData("tmdb_background", self.backgrounds["tmdb_background"], is_poster=False)
elif "tvdb_background" in self.backgrounds: self.collection_background = ImageData("tvdb_background", self.backgrounds["tvdb_background"], is_poster=False)
elif "asset_directory" in self.backgrounds: self.collection_background = self.backgrounds["asset_directory"]
elif "tmdb_collection_details" in self.backgrounds: self.collection_background = ImageData("tmdb_collection_details", self.backgrounds["tmdb_collection_details"], is_poster=False)
elif "tmdb_movie_details" in self.backgrounds: self.collection_background = ImageData("tmdb_movie_details", self.backgrounds["tmdb_movie_details"], is_poster=False)
elif "tvdb_movie_details" in self.backgrounds: self.collection_background = ImageData("tvdb_movie_details", self.backgrounds["tvdb_movie_details"], is_poster=False)
elif "tvdb_show_details" in self.backgrounds: self.collection_background = ImageData("tvdb_show_details", self.backgrounds["tvdb_show_details"], is_poster=False)
elif "tmdb_show_details" in self.backgrounds: self.collection_background = ImageData("tmdb_show_details", self.backgrounds["tmdb_show_details"], is_poster=False)
else:
logger.info("No background collection detail or asset folder found")
if self.collection_poster or self.collection_background:
self.library.upload_images(self.obj, poster=self.collection_poster, background=self.collection_background)
def sort_collection(self):
logger.info("")
util.separator(f"Sorting {self.name} Collection", space=False, border=False)
logger.info("")
if self.sort_by:
search_data = self.build_filter("plex_search", {"sort_by": self.sort_by, "any": {"collection": self.name}})
keys = {}
rating_keys = []
for item in self.library.get_filter_items(search_data[2]):
keys[item.ratingKey] = item
rating_keys.append(item.ratingKey)
else:
keys = {_i.ratingKey: _i for _i in self.library.get_collection_items(self.obj, self.smart_label_collection)}
rating_keys = self.rating_keys
previous = None
logger.debug(keys)
logger.debug(rating_keys)
for key in rating_keys:
text = f"after {self.item_title(keys[previous])}" if previous else "to the beginning"
logger.info(f"Moving {self.item_title(keys[key])} {text}")
self.library.move_item(self.obj, key, after=previous)
previous = key
def send_notifications(self):
if self.obj and self.details["collection_changes_webhooks"] and \
(self.created or len(self.notification_additions) > 0 or len(self.notification_removals) > 0):
self.obj.reload()
try:
self.library.Webhooks.collection_hooks(
self.details["collection_changes_webhooks"],
self.obj,
poster_url=self.collection_poster.location if self.collection_poster.is_url else None,
background_url=self.collection_background.location if self.collection_background.is_url else None,
created=self.created,
deleted=self.deleted,
additions=self.notification_additions,
removals=self.notification_removals
)
except Failed as e:
util.print_stacktrace()
logger.error(f"Webhooks Error: {e}")
def run_collections_again(self):
self.obj = self.library.get_collection(self.name)
name, collection_items = self.library.get_collection_name_and_items(self.obj, self.smart_label_collection)
self.created = False
rating_keys = []
3 years ago
self.notification_additions = []
for mm in self.run_again_movies:
if mm in self.library.movie_map:
rating_keys.extend(self.library.movie_map[mm])
if self.library.is_show:
for sm in self.run_again_shows:
if sm in self.library.show_map:
rating_keys.extend(self.library.show_map[sm])
if len(rating_keys) > 0:
for rating_key in rating_keys:
try:
current = self.library.fetchItem(int(rating_key))
except (BadRequest, NotFound):
logger.error(f"Plex Error: Item {rating_key} not found")
continue
if current in collection_items:
logger.info(f"{name} Collection | = | {self.item_title(current)}")
else:
self.library.alter_collection(current, name, smart_label_collection=self.smart_label_collection)
logger.info(f"{name} Collection | + | {self.item_title(current)}")
if self.library.is_movie and current.ratingKey in self.library.movie_rating_key_map:
add_id = self.library.movie_rating_key_map[current.ratingKey]
elif self.library.is_show and current.ratingKey in self.library.show_rating_key_map:
add_id = self.library.show_rating_key_map[current.ratingKey]
else:
add_id = None
3 years ago
self.notification_additions.append({"title": current.title, "id": add_id})
self.send_notifications()
logger.info(f"{len(rating_keys)} {self.collection_level.capitalize()}{'s' if len(rating_keys) > 1 else ''} Processed")
if len(self.run_again_movies) > 0:
logger.info("")
for missing_id in self.run_again_movies:
if missing_id not in self.library.movie_map:
try:
movie = self.config.TMDb.get_movie(missing_id)
except Failed as e:
logger.error(e)
continue
if self.details["show_missing"] is True:
current_title = f"{movie.title} ({util.validate_date(movie.release_date, 'test').year})" if movie.release_date else movie.title
logger.info(f"{name} Collection | ? | {current_title} (TMDb: {missing_id})")
logger.info("")
logger.info(f"{len(self.run_again_movies)} Movie{'s' if len(self.run_again_movies) > 1 else ''} Missing")
if len(self.run_again_shows) > 0 and self.library.is_show:
logger.info("")
for missing_id in self.run_again_shows:
if missing_id not in self.library.show_map:
try:
title = self.config.TVDb.get_series(missing_id).title
except Failed as e:
logger.error(e)
continue
if self.details["show_missing"] is True:
logger.info(f"{name} Collection | ? | {title} (TVDb: {missing_id})")
logger.info(f"{len(self.run_again_shows)} Show{'s' if len(self.run_again_shows) > 1 else ''} Missing")