add auto_collections

pull/682/head
meisnate12 3 years ago
parent 04f0ad0aac
commit 86d9fdcdcc

@ -279,7 +279,7 @@ class CollectionBuilder:
logger.debug("")
logger.debug("Validating Method: delete_not_scheduled")
logger.debug(f"Value: {data[methods['delete_not_scheduled']]}")
self.details["delete_not_scheduled"] = self._parse("delete_not_scheduled", self.data, datatype="bool", methods=methods, default=False)
self.details["delete_not_scheduled"] = util.parse(self.Type, "delete_not_scheduled", self.data, datatype="bool", methods=methods, default=False)
if "schedule" in methods and not self.config.requested_collections:
logger.debug("")
@ -315,28 +315,28 @@ class CollectionBuilder:
logger.debug("")
logger.debug("Validating Method: validate_builders")
logger.debug(f"Value: {data[methods['validate_builders']]}")
self.validate_builders = self._parse("validate_builders", self.data, datatype="bool", methods=methods, default=True)
self.validate_builders = util.parse(self.Type, "validate_builders", self.data, datatype="bool", methods=methods, default=True)
self.run_again = False
if "run_again" in methods:
logger.debug("")
logger.debug("Validating Method: run_again")
logger.debug(f"Value: {data[methods['run_again']]}")
self.run_again = self._parse("run_again", self.data, datatype="bool", methods=methods, default=False)
self.run_again = util.parse(self.Type, "run_again", self.data, datatype="bool", methods=methods, default=False)
self.build_collection = True
if "build_collection" in methods and not self.playlist:
logger.debug("")
logger.debug("Validating Method: build_collection")
logger.debug(f"Value: {data[methods['build_collection']]}")
self.build_collection = self._parse("build_collection", self.data, datatype="bool", methods=methods, default=True)
self.build_collection = util.parse(self.Type, "build_collection", self.data, datatype="bool", methods=methods, default=True)
self.blank_collection = False
if "blank_collection" in methods and not self.playlist:
logger.debug("")
logger.debug("Validating Method: blank_collection")
logger.debug(f"Value: {data[methods['blank_collection']]}")
self.blank_collection = self._parse("blank_collection", self.data, datatype="bool", methods=methods, default=False)
self.blank_collection = util.parse(self.Type, "blank_collection", self.data, datatype="bool", methods=methods, default=False)
self.sync = self.library.sync_mode == "sync"
if "sync_mode" in methods:
@ -638,76 +638,6 @@ class CollectionBuilder:
logger.info("")
logger.info("Validation Successful")
def _parse(self, attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None, regex=None):
display = f"{parent + ' ' if parent else ''}{attribute} attribute"
if options is None and translation is not None:
options = [o for o in translation]
value = data[methods[attribute]] if methods and attribute in methods else data
if datatype == "list":
if value:
return [v for v in value if v] if isinstance(value, list) else [str(value)]
return []
elif datatype == "intlist":
if value:
try:
return [int(v) for v in value if v] if isinstance(value, list) else [int(value)]
except ValueError:
pass
return []
elif datatype == "dictlist":
final_list = []
for dict_data in util.get_list(value):
if isinstance(dict_data, dict):
final_list.append((dict_data, {dm.lower(): dm for dm in dict_data}))
else:
raise Failed(f"{self.Type} Error: {display} {dict_data} is not a dictionary")
return final_list
elif methods and attribute not in methods:
message = f"{display} not found"
elif value is None:
message = f"{display} is blank"
elif regex is not None:
regex_str, example = regex
if re.compile(regex_str).match(str(value)):
return str(value)
else:
message = f"{display}: {value} must match pattern {regex_str} e.g. {example}"
elif datatype == "bool":
if isinstance(value, bool):
return value
elif isinstance(value, (int, float)):
return value > 0
elif str(value).lower() in ["t", "true"]:
return True
elif str(value).lower() in ["f", "false"]:
return False
else:
message = f"{display} must be either true or false"
elif datatype in ["int", "float"]:
try:
value = int(str(value)) if datatype == "int" else float(str(value))
if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum):
return value
except ValueError:
pass
pre = f"{display} {value} must be {'an integer' if datatype == 'int' else 'a number'}"
if maximum is None:
message = f"{pre} {minimum} or greater"
else:
message = f"{pre} between {minimum} and {maximum}"
elif (translation is not None and str(value).lower() not in translation) or \
(options is not None and translation is None and str(value).lower() not in options):
message = f"{display} {value} must be in {', '.join([str(o) for o in options])}"
else:
return translation[value] if translation is not None else value
if default is None:
raise Failed(f"{self.Type} Error: {message}")
else:
logger.warning(f"{self.Type} Warning: {message} using {default} as default")
return translation[default] if translation is not None else default
def _summary(self, method_name, method_data):
if method_name == "summary":
self.summaries[method_name] = method_data
@ -760,13 +690,13 @@ class CollectionBuilder:
if method_name == "collection_mode":
self.details[method_name] = util.check_collection_mode(method_data)
elif method_name == "minimum_items":
self.minimum = self._parse(method_name, method_data, datatype="int", minimum=1)
self.minimum = util.parse(self.Type, method_name, method_data, datatype="int", minimum=1)
elif method_name == "server_preroll":
self.server_preroll = self._parse(method_name, method_data)
self.server_preroll = util.parse(self.Type, method_name, method_data)
elif method_name == "ignore_ids":
self.ignore_ids.extend(self._parse(method_name, method_data, datatype="intlist"))
self.ignore_ids.extend(util.parse(self.Type, method_name, method_data, datatype="intlist"))
elif method_name == "ignore_imdb_ids":
self.ignore_imdb_ids.extend(self._parse(method_name, method_data, datatype="list"))
self.ignore_imdb_ids.extend(util.parse(self.Type, method_name, method_data, datatype="list"))
elif method_name == "label":
if "label" in methods and "label.sync" in methods:
raise Failed(f"{self.Type} Error: Cannot use label and label.sync together")
@ -777,7 +707,7 @@ class CollectionBuilder:
else:
self.details[method_final] = util.get_list(method_data) if method_data else []
elif method_name == "changes_webhooks":
self.details[method_name] = self._parse(method_name, method_data, datatype="list")
self.details[method_name] = util.parse(self.Type, method_name, method_data, datatype="list")
elif method_name in scheduled_boolean:
if isinstance(method_data, bool):
self.details[method_name] = method_data
@ -789,13 +719,13 @@ class CollectionBuilder:
self.details[method_name] = False
else:
try:
util.schedule_check(method_name, self._parse(method_name, method_data), self.current_time, self.config.run_hour)
util.schedule_check(method_name, util.parse(self.Type, method_name, method_data), self.current_time, self.config.run_hour)
self.details[method_name] = True
except NotScheduled:
self.details[method_name] = False
elif method_name in boolean_details:
default = self.details[method_name] if method_name in self.details else None
self.details[method_name] = self._parse(method_name, method_data, datatype="bool", default=default)
self.details[method_name] = util.parse(self.Type, method_name, method_data, datatype="bool", default=default)
elif method_name in string_details:
self.details[method_name] = str(method_data)
@ -852,9 +782,9 @@ class CollectionBuilder:
self.library.overlays.append(name)
self.item_details[method_name] = name
elif method_name == "item_refresh_delay":
self.item_details[method_name] = self._parse(method_name, method_data, datatype="int", default=0, minimum=0)
self.item_details[method_name] = util.parse(self.Type, method_name, method_data, datatype="int", default=0, minimum=0)
elif method_name in item_bool_details:
if self._parse(method_name, method_data, datatype="bool", default=False):
if util.parse(self.Type, method_name, method_data, datatype="bool", default=False):
self.item_details[method_name] = True
elif method_name in item_false_details:
self.item_details[method_name] = False
@ -871,7 +801,7 @@ class CollectionBuilder:
def _radarr(self, method_name, method_data):
if method_name in ["radarr_add_missing", "radarr_add_existing", "radarr_monitor", "radarr_search"]:
self.radarr_details[method_name[7:]] = self._parse(method_name, method_data, datatype="bool")
self.radarr_details[method_name[7:]] = util.parse(self.Type, method_name, method_data, datatype="bool")
elif method_name == "radarr_folder":
self.radarr_details["folder"] = method_data
elif method_name == "radarr_availability":
@ -886,7 +816,7 @@ class CollectionBuilder:
def _sonarr(self, method_name, method_data):
if method_name in ["sonarr_add_missing", "sonarr_add_existing", "sonarr_season", "sonarr_search", "sonarr_cutoff_search"]:
self.sonarr_details[method_name[7:]] = self._parse(method_name, method_data, datatype="bool")
self.sonarr_details[method_name[7:]] = util.parse(self.Type, method_name, method_data, datatype="bool")
elif method_name in ["sonarr_folder", "sonarr_quality", "sonarr_language"]:
self.sonarr_details[method_name[7:]] = method_data
elif method_name == "sonarr_monitor":
@ -904,12 +834,12 @@ class CollectionBuilder:
def _anidb(self, method_name, method_data):
if method_name == "anidb_popular":
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=30, maximum=30)))
self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=30, maximum=30)))
elif method_name in ["anidb_id", "anidb_relation"]:
for anidb_id in self.config.AniDB.validate_anidb_ids(method_data, self.language):
self.builders.append((method_name, anidb_id))
elif method_name == "anidb_tag":
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if "tag" not in dict_methods:
raise Failed(f"{self.Type} Error: anidb_tag tag attribute is required")
@ -917,7 +847,7 @@ class CollectionBuilder:
raise Failed(f"{self.Type} Error: anidb_tag tag attribute is blank")
else:
new_dictionary["tag"] = util.regex_first_int(dict_data[dict_methods["tag"]], "AniDB Tag ID")
new_dictionary["limit"] = self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, minimum=0)
new_dictionary["limit"] = util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, minimum=0)
self.builders.append((method_name, new_dictionary))
def _anilist(self, method_name, method_data):
@ -925,50 +855,50 @@ class CollectionBuilder:
for anilist_id in self.config.AniList.validate_anilist_ids(method_data, studio=method_name == "anilist_studio"):
self.builders.append((method_name, anilist_id))
elif method_name in ["anilist_popular", "anilist_trending", "anilist_top_rated"]:
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10)))
self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10)))
elif method_name == "anilist_search":
if self.current_time.month in [12, 1, 2]: current_season = "winter"
elif self.current_time.month in [3, 4, 5]: current_season = "spring"
elif self.current_time.month in [6, 7, 8]: current_season = "summer"
else: current_season = "fall"
default_year = self.current_year + 1 if self.current_time.month == 12 else self.current_year
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
new_dictionary = {}
for search_method, search_data in dict_data.items():
search_attr, modifier = os.path.splitext(str(search_method).lower())
if search_method not in anilist.searches:
raise Failed(f"{self.Type} Error: {method_name} {search_method} attribute not supported")
elif search_attr == "season":
new_dictionary[search_attr] = self._parse(search_attr, search_data, parent=method_name, default=current_season, options=util.seasons)
new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, parent=method_name, default=current_season, options=util.seasons)
if "year" not in dict_methods:
logger.warning(f"Collection Warning: {method_name} year attribute not found using this year: {default_year} by default")
new_dictionary["year"] = default_year
elif search_attr == "year":
new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="int", parent=method_name, default=default_year, minimum=1917, maximum=default_year + 1)
new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, datatype="int", parent=method_name, default=default_year, minimum=1917, maximum=default_year + 1)
elif search_data is None:
raise Failed(f"{self.Type} Error: {method_name} {search_method} attribute is blank")
elif search_attr == "adult":
new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="bool", parent=method_name)
new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, datatype="bool", parent=method_name)
elif search_attr == "country":
new_dictionary[search_attr] = self._parse(search_attr, search_data, options=anilist.country_codes, parent=method_name)
new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, options=anilist.country_codes, parent=method_name)
elif search_attr == "source":
new_dictionary[search_attr] = self._parse(search_attr, search_data, options=anilist.media_source, parent=method_name)
new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, options=anilist.media_source, parent=method_name)
elif search_attr in ["episodes", "duration", "score", "popularity"]:
new_dictionary[search_method] = self._parse(search_method, search_data, datatype="int", parent=method_name)
new_dictionary[search_method] = util.parse(self.Type, search_method, search_data, datatype="int", parent=method_name)
elif search_attr in ["format", "status", "genre", "tag", "tag_category"]:
new_dictionary[search_method] = self.config.AniList.validate(search_attr.replace("_", " ").title(), self._parse(search_method, search_data))
new_dictionary[search_method] = self.config.AniList.validate(search_attr.replace("_", " ").title(), util.parse(self.Type, search_method, search_data))
elif search_attr in ["start", "end"]:
new_dictionary[search_method] = util.validate_date(search_data, f"{method_name} {search_method} attribute", return_as="%m/%d/%Y")
elif search_attr == "min_tag_percent":
new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="int", parent=method_name, minimum=0, maximum=100)
new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, datatype="int", parent=method_name, minimum=0, maximum=100)
elif search_attr == "search":
new_dictionary[search_attr] = str(search_data)
elif search_method not in ["sort_by", "limit"]:
raise Failed(f"{self.Type} Error: {method_name} {search_method} attribute not supported")
if len(new_dictionary) == 0:
raise Failed(f"{self.Type} Error: {method_name} must have at least one valid search option")
new_dictionary["sort_by"] = self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=anilist.sort_options)
new_dictionary["limit"] = self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
new_dictionary["sort_by"] = util.parse(self.Type, "sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=anilist.sort_options)
new_dictionary["limit"] = util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
self.builders.append((method_name, new_dictionary))
def _flixpatrol(self, method_name, method_data):
@ -977,26 +907,26 @@ class CollectionBuilder:
for flixpatrol_list in flixpatrol_lists:
self.builders.append(("flixpatrol_url", flixpatrol_list))
elif method_name in flixpatrol.builders:
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
if method_name == "flixpatrol_demographics":
data = {
"generation": self._parse("generation", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.generations),
"gender": self._parse("gender", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.gender),
"location": self._parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.demo_locations),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
"generation": util.parse(self.Type, "generation", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.generations),
"gender": util.parse(self.Type, "gender", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.gender),
"location": util.parse(self.Type, "location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.demo_locations),
"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
elif method_name == "flixpatrol_popular":
data = {
"source": self._parse("source", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.popular),
"time_window": self._parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
"source": util.parse(self.Type, "source", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.popular),
"time_window": util.parse(self.Type, "time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
elif method_name == "flixpatrol_top":
data = {
"platform": self._parse("platform", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.platforms),
"location": self._parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.locations),
"time_window": self._parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
"platform": util.parse(self.Type, "platform", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.platforms),
"location": util.parse(self.Type, "location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.locations),
"time_window": util.parse(self.Type, "time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
else:
continue
@ -1045,50 +975,50 @@ class CollectionBuilder:
for mal_id in util.get_int_list(method_data, "MyAnimeList ID"):
self.builders.append((method_name, mal_id))
elif method_name in ["mal_all", "mal_airing", "mal_upcoming", "mal_tv", "mal_ova", "mal_movie", "mal_special", "mal_popular", "mal_favorite", "mal_suggested"]:
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10, maximum=100 if method_name == "mal_suggested" else 500)))
self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10, maximum=100 if method_name == "mal_suggested" else 500)))
elif method_name in ["mal_season", "mal_userlist"]:
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
if method_name == "mal_season":
if self.current_time.month in [1, 2, 3]: default_season = "winter"
elif self.current_time.month in [4, 5, 6]: default_season = "spring"
elif self.current_time.month in [7, 8, 9]: default_season = "summer"
else: default_season = "fall"
self.builders.append((method_name, {
"season": self._parse("season", dict_data, methods=dict_methods, parent=method_name, default=default_season, options=util.seasons),
"sort_by": self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation),
"year": self._parse("year", dict_data, datatype="int", methods=dict_methods, default=self.current_year, parent=method_name, minimum=1917, maximum=self.current_year + 1),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=500)
"season": util.parse(self.Type, "season", dict_data, methods=dict_methods, parent=method_name, default=default_season, options=util.seasons),
"sort_by": util.parse(self.Type, "sort_by", dict_data, methods=dict_methods, parent=method_name, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation),
"year": util.parse(self.Type, "year", dict_data, datatype="int", methods=dict_methods, default=self.current_year, parent=method_name, minimum=1917, maximum=self.current_year + 1),
"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=500)
}))
elif method_name == "mal_userlist":
self.builders.append((method_name, {
"username": self._parse("username", dict_data, methods=dict_methods, parent=method_name),
"status": self._parse("status", dict_data, methods=dict_methods, parent=method_name, default="all", options=mal.userlist_status),
"sort_by": self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=1000)
"username": util.parse(self.Type, "username", dict_data, methods=dict_methods, parent=method_name),
"status": util.parse(self.Type, "status", dict_data, methods=dict_methods, parent=method_name, default="all", options=mal.userlist_status),
"sort_by": util.parse(self.Type, "sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation),
"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=1000)
}))
elif method_name in ["mal_genre", "mal_studio"]:
id_name = f"{method_name[4:]}_id"
final_data = []
for data in util.get_list(method_data):
final_data.append(data if isinstance(data, dict) else {id_name: data, "limit": 0})
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
self.builders.append((method_name, {
id_name: self._parse(id_name, dict_data, datatype="int", methods=dict_methods, parent=method_name, maximum=999999),
"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
id_name: util.parse(self.Type, id_name, dict_data, datatype="int", methods=dict_methods, parent=method_name, maximum=999999),
"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
}))
def _plex(self, method_name, method_data):
if method_name in ["plex_all", "plex_pilots"]:
self.builders.append((method_name, self.collection_level))
elif method_name in ["plex_search", "plex_collectionless"]:
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
new_dictionary = {}
if method_name == "plex_search":
type_override = f"{self.collection_level}s" if self.collection_level in plex.collection_level_options else None
new_dictionary = self.build_filter("plex_search", dict_data, type_override=type_override)
elif method_name == "plex_collectionless":
prefix_list = self._parse("exclude_prefix", dict_data, datatype="list", methods=dict_methods)
exact_list = self._parse("exclude", dict_data, datatype="list", methods=dict_methods)
prefix_list = util.parse(self.Type, "exclude_prefix", dict_data, datatype="list", methods=dict_methods)
exact_list = util.parse(self.Type, "exclude", dict_data, datatype="list", methods=dict_methods)
if len(prefix_list) == 0 and len(exact_list) == 0:
raise Failed(f"{self.Type} Error: you must have at least one exclusion")
exact_list.append(self.name)
@ -1099,26 +1029,26 @@ class CollectionBuilder:
self.builders.append(("plex_search", self.build_filter("plex_search", {"any": {method_name: method_data}})))
def _stevenlu(self, method_name, method_data):
self.builders.append((method_name, self._parse(method_name, method_data, "bool")))
self.builders.append((method_name, util.parse(self.Type, method_name, method_data, "bool")))
def _mdblist(self, method_name, method_data):
for mdb_dict in self.config.Mdblist.validate_mdblist_lists(method_data):
self.builders.append((method_name, mdb_dict))
def _tautulli(self, method_name, method_data):
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
self.builders.append((method_name, {
"list_type": "popular" if method_name == "tautulli_popular" else "watched",
"list_days": self._parse("list_days", dict_data, datatype="int", methods=dict_methods, default=30, parent=method_name),
"list_size": self._parse("list_size", dict_data, datatype="int", methods=dict_methods, default=10, parent=method_name),
"list_buffer": self._parse("list_buffer", dict_data, datatype="int", methods=dict_methods, default=20, parent=method_name),
"list_minimum": self._parse("list_minimum", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
"list_days": util.parse(self.Type, "list_days", dict_data, datatype="int", methods=dict_methods, default=30, parent=method_name),
"list_size": util.parse(self.Type, "list_size", dict_data, datatype="int", methods=dict_methods, default=10, parent=method_name),
"list_buffer": util.parse(self.Type, "list_buffer", dict_data, datatype="int", methods=dict_methods, default=20, parent=method_name),
"list_minimum": util.parse(self.Type, "list_minimum", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
}))
def _tmdb(self, method_name, method_data):
if method_name == "tmdb_discover":
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
new_dictionary = {"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name)}
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
new_dictionary = {"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name)}
for discover_method, discover_data in dict_data.items():
discover_attr, modifier = os.path.splitext(str(discover_method).lower())
if discover_data is None:
@ -1131,10 +1061,10 @@ class CollectionBuilder:
raise Failed(f"{self.Type} Error: {method_name} {discover_method} attribute only works for movie libraries")
elif discover_attr in ["language", "region"]:
regex = ("([a-z]{2})-([A-Z]{2})", "en-US") if discover_attr == "language" else ("^[A-Z]{2}$", "US")
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, parent=method_name, regex=regex)
new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, parent=method_name, regex=regex)
elif discover_attr == "sort_by":
options = tmdb.discover_movie_sort if self.library.is_movie else tmdb.discover_tv_sort
new_dictionary[discover_method] = self._parse(discover_attr, discover_data, parent=method_name, options=options)
new_dictionary[discover_method] = util.parse(self.Type, discover_attr, discover_data, parent=method_name, options=options)
elif discover_attr == "certification_country":
if "certification" in dict_data or "certification.lte" in dict_data or "certification.gte" in dict_data:
new_dictionary[discover_method] = discover_data
@ -1152,23 +1082,23 @@ class CollectionBuilder:
raise Failed(f"{self.Type} Error: {method_name} {discover_method} attribute: must be used with either with_watch_providers, without_watch_providers, or with_watch_monetization_types")
elif discover_attr == "with_watch_monetization_types":
if "watch_region" in dict_data:
new_dictionary[discover_method] = self._parse(discover_attr, discover_data, parent=method_name, options=tmdb.discover_monetization_types)
new_dictionary[discover_method] = util.parse(self.Type, discover_attr, discover_data, parent=method_name, options=tmdb.discover_monetization_types)
else:
raise Failed(f"{self.Type} Error: {method_name} {discover_method} attribute: must be used with watch_region")
elif discover_attr in tmdb.discover_booleans:
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="bool", parent=method_name)
new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="bool", parent=method_name)
elif discover_attr == "vote_average":
new_dictionary[discover_method] = self._parse(discover_method, discover_data, datatype="float", parent=method_name)
new_dictionary[discover_method] = util.parse(self.Type, discover_method, discover_data, datatype="float", parent=method_name)
elif discover_attr == "with_status":
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=5)
new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=5)
elif discover_attr == "with_type":
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=6)
new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=6)
elif discover_method in tmdb.discover_dates:
new_dictionary[discover_method] = util.validate_date(discover_data, f"{method_name} {discover_method} attribute", return_as="%m/%d/%Y")
elif discover_attr in tmdb.discover_years:
new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=1800, maximum=self.current_year + 1)
new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="int", parent=method_name, minimum=1800, maximum=self.current_year + 1)
elif discover_attr in tmdb.discover_ints:
new_dictionary[discover_method] = self._parse(discover_method, discover_data, datatype="int", parent=method_name)
new_dictionary[discover_method] = util.parse(self.Type, discover_method, discover_data, datatype="int", parent=method_name)
elif discover_method in tmdb.discover_strings:
new_dictionary[discover_method] = discover_data
elif discover_attr != "limit":
@ -1178,7 +1108,7 @@ class CollectionBuilder:
else:
raise Failed(f"{self.Type} Error: {method_name} had no valid fields")
elif method_name in ["tmdb_popular", "tmdb_top_rated", "tmdb_now_playing", "tmdb_trending_daily", "tmdb_trending_weekly"]:
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10)))
self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10)))
else:
values = self.config.TMDb.validate_tmdb_ids(method_data, method_name)
if method_name.endswith("_details"):
@ -1214,12 +1144,12 @@ class CollectionBuilder:
for trakt_list in self.config.Trakt.validate_trakt(method_data, self.library.is_movie, trakt_type=method_name[6:]):
self.builders.append((method_name, trakt_list))
elif method_name == "trakt_boxoffice":
if self._parse(method_name, method_data, datatype="bool", default=False):
if util.parse(self.Type, method_name, method_data, datatype="bool", default=False):
self.builders.append((method_name, 10))
else:
raise Failed(f"{self.Type} Error: {method_name} must be set to true")
elif method_name in trakt.builders:
self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10)))
self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10)))
def _tvdb(self, method_name, method_data):
values = util.get_list(method_data)
@ -1236,7 +1166,7 @@ class CollectionBuilder:
self.builders.append((method_name[:-8] if method_name.endswith("_details") else method_name, value))
def _filters(self, method_name, method_data):
for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"):
for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"):
validate = True
if "validate" in dict_data:
if dict_data["validate"] is None:
@ -1651,7 +1581,7 @@ class CollectionBuilder:
return util.get_list(data)
elif attribute == "history":
try:
return self._parse(final, data, datatype="int", maximum=30)
return util.parse(self.Type, final, data, datatype="int", maximum=30)
except Failed:
if str(data).lower() in ["day", "month"]:
return data.lower()
@ -1691,15 +1621,15 @@ class CollectionBuilder:
final_years = []
values = util.get_list(data)
for value in values:
final_years.append(self._parse(final, value, datatype="int"))
final_years.append(util.parse(self.Type, final, value, datatype="int"))
return smart_pair(final_years)
elif (attribute in plex.number_attributes + plex.date_attributes + plex.year_attributes + ["tmdb_year"] and modifier in ["", ".not", ".gt", ".gte", ".lt", ".lte"]) \
or (attribute in plex.tag_attributes and modifier in [".count_gt", ".count_gte", ".count_lt", ".count_lte"]):
return self._parse(final, data, datatype="int")
return util.parse(self.Type, final, data, datatype="int")
elif attribute in plex.float_attributes and modifier in [".gt", ".gte", ".lt", ".lte"]:
return self._parse(final, data, datatype="float", minimum=0, maximum=10)
return util.parse(self.Type, final, data, datatype="float", minimum=0, maximum=10)
elif attribute in plex.boolean_attributes + boolean_filters:
return self._parse(attribute, data, datatype="bool")
return util.parse(self.Type, attribute, data, datatype="bool")
else:
raise Failed(f"{self.Type} Error: {final} attribute not supported")

@ -566,6 +566,7 @@ class ConfigFile:
util.separator(f"{display_name} Configuration")
logger.info("")
logger.info(f"Connecting to {display_name} Library...")
logger.info("")
params["asset_directory"] = check_for_attribute(lib, "asset_directory", parent="settings", var_type="list_path", default=self.general["asset_directory"], default_is_none=True, save=False)
if params["asset_directory"] is None:
@ -772,6 +773,9 @@ class ConfigFile:
except NotScheduled:
params["skip_library"] = True
logger.info("")
util.separator("Plex Configuration", space=False, border=False)
logger.info("")
params["plex"] = {
"url": check_for_attribute(lib, "url", parent="plex", var_type="url", default=self.general["plex"]["url"], req_default=True, save=False),
"token": check_for_attribute(lib, "token", parent="plex", default=self.general["plex"]["token"], req_default=True, save=False),
@ -789,6 +793,18 @@ class ConfigFile:
logger.info("")
logger.info(f"{display_name} Library Connection Failed")
continue
try:
logger.info("")
util.separator("Scaning Metadata Files", space=False, border=False)
logger.info("")
library.scan_metadata_files()
except Failed as e:
self.errors.append(e)
util.print_stacktrace()
util.print_multiline(e, error=True)
logger.info("")
logger.info(f"{display_name} Metadata Failed to Load")
continue
if self.general["radarr"]["url"] or (lib and "radarr" in lib):
logger.info("")

@ -95,6 +95,17 @@ class Library(ABC):
self.library_operation = self.items_library_operation or self.delete_unmanaged_collections or self.delete_collections_with_less \
or self.radarr_remove_by_tag or self.sonarr_remove_by_tag or self.mass_collection_mode \
or self.genre_collections or self.show_unmanaged or self.metadata_backup
if self.asset_directory:
logger.info("")
for ad in self.asset_directory:
logger.info(f"Using Asset Directory: {ad}")
if output:
logger.info("")
logger.info(output)
def scan_metadata_files(self):
metadata = []
for file_type, metadata_file in self.metadata_path:
if file_type == "Folder":
@ -110,7 +121,7 @@ class Library(ABC):
metadata.append((file_type, metadata_file))
for file_type, metadata_file in metadata:
try:
meta_obj = MetadataFile(config, self, file_type, metadata_file)
meta_obj = MetadataFile(self.config, self, file_type, metadata_file)
if meta_obj.collections:
self.collections.extend([c for c in meta_obj.collections])
if meta_obj.metadata:
@ -123,15 +134,6 @@ class Library(ABC):
logger.info("")
raise Failed("Config Error: No valid metadata files, playlist files, or library operations found")
if self.asset_directory:
logger.info("")
for ad in self.asset_directory:
logger.info(f"Using Asset Directory: {ad}")
if output:
logger.info("")
logger.info(output)
def upload_images(self, item, poster=None, background=None, overlay=None):
image = None
image_compare = None

@ -30,7 +30,7 @@ def get_dict(attribute, attr_data, check_list=None):
logger.error(f"Config Error: {attribute} must be a dictionary")
else:
logger.error(f"Config Error: {attribute} attribute is blank")
return None
return {}
class DataFile:
@ -224,10 +224,80 @@ class MetadataFile(DataFile):
else:
logger.info("")
logger.info(f"Loading Metadata {file_type}: {path}")
logger.info("")
data = self.load_file()
self.metadata = get_dict("metadata", data, library.metadata_files)
self.templates = get_dict("templates", data)
self.collections = get_dict("collections", data, library.collections)
col_names = library.collections + [c for c in self.collections]
for auto_name, auto_data in get_dict("auto_collections", data).items():
try:
auto_methods = {dm.lower(): dm for dm in auto_data}
if "auto_list" not in auto_methods:
raise Failed(f"Config Error: {auto_name}'s auto_list attribute not found")
elif not auto_data[auto_methods["auto_list"]]:
raise Failed(f"Config Error: {auto_name}'s auto_list attribute is blank")
elif auto_data[auto_methods["auto_list"]] not in ["genre", "tmdb_collection"]:
raise Failed(f"Config Error: {auto_name}'s auto_list attribute {auto_data[auto_methods['auto_list']]} invalid")
else:
auto_type = auto_data[auto_methods["auto_list"]]
exclude = util.parse("Config", "exclude", auto_data, methods=auto_methods, datatype="list") if "exclude" in auto_methods else []
if auto_type == "genre":
auto_list = {genre: genre for genre in library.get_genres() if genre not in exclude}
default_template = {"smart_filter": {"limit": 50, "sort_by": "critic_rating.desc", "all": {"genre": "<<genre>>"}}}
default_title_format = "Top <<title>> <<library_type>>s"
elif auto_type == "tmdb_collection":
auto_list = {}
items = library.get_all()
for i, item in enumerate(items, 1):
util.print_return(f"Processing: {i}/{len(items)} {item.title}")
tmdb_id, tvdb_id, imdb_id = library.get_ids(item)
tmdb_item = config.TMDb.get_item(item, tmdb_id, tvdb_id, imdb_id, is_movie=True)
if tmdb_item and tmdb_item.collection and tmdb_item.collection.id not in exclude and tmdb_item.collection.name not in exclude:
auto_list[tmdb_item.collection.id] = tmdb_item.collection.name
util.print_end()
default_template = {"tmdb_collection_details": "<<tmdb_collection>>"}
default_title_format = "<<title>>"
else:
raise Failed
title_format = util.parse("Config", "title_format", auto_data, methods=auto_methods, default=default_title_format)
if "<<title>>" not in title_format:
logger.error(f"Config Error: <<title>> not in title_format: {title_format} using default: {default_title_format}")
title_format = default_title_format
if "<<library_type>>" in title_format:
title_format = title_format.replace("<<library_type>>", library.type)
dictionary_variables = util.parse("Config", "dictionary_variables", auto_data, methods=auto_methods, datatype="dictdict") if "dictionary_variables" in auto_methods else {}
template_name = util.parse("Config", "template", auto_data, methods=auto_methods)
if template_name:
if template_name not in self.templates:
raise Failed(f"Config Error: template: {template_name} not found")
if f"<<{auto_type}>>" not in str(self.templates[template_name]):
raise Failed(f"Config Error: template: {template_name} is required to have the template variable <<{auto_type}>>")
else:
self.templates[auto_name] = default_template
template_name = auto_name
remove_prefix = util.parse("Config", "remove_prefix", auto_data, methods=auto_methods, datatype="commalist") if "remove_prefix" in auto_methods else []
remove_suffix = util.parse("Config", "remove_suffix", auto_data, methods=auto_methods, datatype="commalist") if "remove_suffix" in auto_methods else []
for key, value in auto_list.items():
template_call = {"name": template_name, auto_type: key}
for k, v in dictionary_variables.items():
if key in v:
template_call[k] = v[key]
for prefix in remove_prefix:
if value.startswith(prefix):
value = value[len(prefix):].strip()
for suffix in remove_suffix:
if value.endswith(suffix):
value = value[:-len(suffix)].strip()
collection_title = title_format.replace("<<title>>", value)
if collection_title in col_names:
logger.warning(f"Config Warning: Skipping duplicate collection: {collection_title}")
else:
self.collections[collection_title] = {"template": template_call}
except Failed as e:
logger.error(e)
continue
if not self.metadata and not self.collections:
raise Failed("YAML Error: metadata or collections attribute is required")

@ -383,7 +383,6 @@ class Plex(Library):
self.url = params["plex"]["url"]
self.token = params["plex"]["token"]
self.timeout = params["plex"]["timeout"]
logger.info("")
try:
self.PlexServer = PlexServer(baseurl=self.url, token=self.token, session=self.config.session, timeout=self.timeout)
except Unauthorized:

@ -474,3 +474,84 @@ def schedule_check(attribute, data, current_time, run_hour):
raise NotScheduledRange(schedule_str)
elif skip_collection:
raise NotScheduled(schedule_str)
def parse(error, attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None, regex=None):
display = f"{parent + ' ' if parent else ''}{attribute} attribute"
if options is None and translation is not None:
options = [o for o in translation]
value = data[methods[attribute]] if methods and attribute in methods else data
if datatype in ["list", "commalist"]:
if value:
if datatype == "commalist":
value = get_list(value)
return [v for v in value if v] if isinstance(value, list) else [str(value)]
return []
elif datatype == "intlist":
if value:
try:
return [int(v) for v in value if v] if isinstance(value, list) else [int(value)]
except ValueError:
pass
return []
elif datatype == "dictlist":
final_list = []
for dict_data in get_list(value):
if isinstance(dict_data, dict):
final_list.append((dict_data, {dm.lower(): dm for dm in dict_data}))
else:
raise Failed(f"{error} Error: {display} {dict_data} is not a dictionary")
return final_list
elif datatype == "dictdict":
final_dict = {}
if isinstance(value, dict):
for dict_key, dict_data in value.items():
if isinstance(dict_data, dict) and dict_data:
final_dict[dict_key] = dict_data
else:
raise Failed(f"{error} Warning: {display} {dict_key} is not a dictionary")
return final_dict
elif methods and attribute not in methods:
message = f"{display} not found"
elif value is None:
message = f"{display} is blank"
elif regex is not None:
regex_str, example = regex
if re.compile(regex_str).match(str(value)):
return str(value)
else:
message = f"{display}: {value} must match pattern {regex_str} e.g. {example}"
elif datatype == "bool":
if isinstance(value, bool):
return value
elif isinstance(value, (int, float)):
return value > 0
elif str(value).lower() in ["t", "true"]:
return True
elif str(value).lower() in ["f", "false"]:
return False
else:
message = f"{display} must be either true or false"
elif datatype in ["int", "float"]:
try:
value = int(str(value)) if datatype == "int" else float(str(value))
if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum):
return value
except ValueError:
pass
pre = f"{display} {value} must be {'an integer' if datatype == 'int' else 'a number'}"
if maximum is None:
message = f"{pre} {minimum} or greater"
else:
message = f"{pre} between {minimum} and {maximum}"
elif (translation is not None and str(value).lower() not in translation) or \
(options is not None and translation is None and str(value).lower() not in options):
message = f"{display} {value} must be in {', '.join([str(o) for o in options])}"
else:
return translation[value] if translation is not None else value
if default is None:
raise Failed(f"{error} Error: {message}")
else:
logger.warning(f"{error} Warning: {message} using {default} as default")
return translation[default] if translation is not None else default

Loading…
Cancel
Save