diff --git a/modules/builder.py b/modules/builder.py index a82bc3d0..f7fde66e 100644 --- a/modules/builder.py +++ b/modules/builder.py @@ -279,7 +279,7 @@ class CollectionBuilder: logger.debug("") logger.debug("Validating Method: delete_not_scheduled") logger.debug(f"Value: {data[methods['delete_not_scheduled']]}") - self.details["delete_not_scheduled"] = self._parse("delete_not_scheduled", self.data, datatype="bool", methods=methods, default=False) + self.details["delete_not_scheduled"] = util.parse(self.Type, "delete_not_scheduled", self.data, datatype="bool", methods=methods, default=False) if "schedule" in methods and not self.config.requested_collections: logger.debug("") @@ -315,28 +315,28 @@ class CollectionBuilder: logger.debug("") logger.debug("Validating Method: validate_builders") logger.debug(f"Value: {data[methods['validate_builders']]}") - self.validate_builders = self._parse("validate_builders", self.data, datatype="bool", methods=methods, default=True) + self.validate_builders = util.parse(self.Type, "validate_builders", self.data, datatype="bool", methods=methods, default=True) self.run_again = False if "run_again" in methods: logger.debug("") logger.debug("Validating Method: run_again") logger.debug(f"Value: {data[methods['run_again']]}") - self.run_again = self._parse("run_again", self.data, datatype="bool", methods=methods, default=False) + self.run_again = util.parse(self.Type, "run_again", self.data, datatype="bool", methods=methods, default=False) self.build_collection = True if "build_collection" in methods and not self.playlist: logger.debug("") logger.debug("Validating Method: build_collection") logger.debug(f"Value: {data[methods['build_collection']]}") - self.build_collection = self._parse("build_collection", self.data, datatype="bool", methods=methods, default=True) + self.build_collection = util.parse(self.Type, "build_collection", self.data, datatype="bool", methods=methods, default=True) self.blank_collection = False if "blank_collection" in methods and not self.playlist: logger.debug("") logger.debug("Validating Method: blank_collection") logger.debug(f"Value: {data[methods['blank_collection']]}") - self.blank_collection = self._parse("blank_collection", self.data, datatype="bool", methods=methods, default=False) + self.blank_collection = util.parse(self.Type, "blank_collection", self.data, datatype="bool", methods=methods, default=False) self.sync = self.library.sync_mode == "sync" if "sync_mode" in methods: @@ -638,76 +638,6 @@ class CollectionBuilder: logger.info("") logger.info("Validation Successful") - def _parse(self, attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None, regex=None): - display = f"{parent + ' ' if parent else ''}{attribute} attribute" - if options is None and translation is not None: - options = [o for o in translation] - value = data[methods[attribute]] if methods and attribute in methods else data - - if datatype == "list": - if value: - return [v for v in value if v] if isinstance(value, list) else [str(value)] - return [] - elif datatype == "intlist": - if value: - try: - return [int(v) for v in value if v] if isinstance(value, list) else [int(value)] - except ValueError: - pass - return [] - elif datatype == "dictlist": - final_list = [] - for dict_data in util.get_list(value): - if isinstance(dict_data, dict): - final_list.append((dict_data, {dm.lower(): dm for dm in dict_data})) - else: - raise Failed(f"{self.Type} Error: {display} {dict_data} is not a dictionary") - return final_list - elif methods and attribute not in methods: - message = f"{display} not found" - elif value is None: - message = f"{display} is blank" - elif regex is not None: - regex_str, example = regex - if re.compile(regex_str).match(str(value)): - return str(value) - else: - message = f"{display}: {value} must match pattern {regex_str} e.g. {example}" - elif datatype == "bool": - if isinstance(value, bool): - return value - elif isinstance(value, (int, float)): - return value > 0 - elif str(value).lower() in ["t", "true"]: - return True - elif str(value).lower() in ["f", "false"]: - return False - else: - message = f"{display} must be either true or false" - elif datatype in ["int", "float"]: - try: - value = int(str(value)) if datatype == "int" else float(str(value)) - if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum): - return value - except ValueError: - pass - pre = f"{display} {value} must be {'an integer' if datatype == 'int' else 'a number'}" - if maximum is None: - message = f"{pre} {minimum} or greater" - else: - message = f"{pre} between {minimum} and {maximum}" - elif (translation is not None and str(value).lower() not in translation) or \ - (options is not None and translation is None and str(value).lower() not in options): - message = f"{display} {value} must be in {', '.join([str(o) for o in options])}" - else: - return translation[value] if translation is not None else value - - if default is None: - raise Failed(f"{self.Type} Error: {message}") - else: - logger.warning(f"{self.Type} Warning: {message} using {default} as default") - return translation[default] if translation is not None else default - def _summary(self, method_name, method_data): if method_name == "summary": self.summaries[method_name] = method_data @@ -760,13 +690,13 @@ class CollectionBuilder: if method_name == "collection_mode": self.details[method_name] = util.check_collection_mode(method_data) elif method_name == "minimum_items": - self.minimum = self._parse(method_name, method_data, datatype="int", minimum=1) + self.minimum = util.parse(self.Type, method_name, method_data, datatype="int", minimum=1) elif method_name == "server_preroll": - self.server_preroll = self._parse(method_name, method_data) + self.server_preroll = util.parse(self.Type, method_name, method_data) elif method_name == "ignore_ids": - self.ignore_ids.extend(self._parse(method_name, method_data, datatype="intlist")) + self.ignore_ids.extend(util.parse(self.Type, method_name, method_data, datatype="intlist")) elif method_name == "ignore_imdb_ids": - self.ignore_imdb_ids.extend(self._parse(method_name, method_data, datatype="list")) + self.ignore_imdb_ids.extend(util.parse(self.Type, method_name, method_data, datatype="list")) elif method_name == "label": if "label" in methods and "label.sync" in methods: raise Failed(f"{self.Type} Error: Cannot use label and label.sync together") @@ -777,7 +707,7 @@ class CollectionBuilder: else: self.details[method_final] = util.get_list(method_data) if method_data else [] elif method_name == "changes_webhooks": - self.details[method_name] = self._parse(method_name, method_data, datatype="list") + self.details[method_name] = util.parse(self.Type, method_name, method_data, datatype="list") elif method_name in scheduled_boolean: if isinstance(method_data, bool): self.details[method_name] = method_data @@ -789,13 +719,13 @@ class CollectionBuilder: self.details[method_name] = False else: try: - util.schedule_check(method_name, self._parse(method_name, method_data), self.current_time, self.config.run_hour) + util.schedule_check(method_name, util.parse(self.Type, method_name, method_data), self.current_time, self.config.run_hour) self.details[method_name] = True except NotScheduled: self.details[method_name] = False elif method_name in boolean_details: default = self.details[method_name] if method_name in self.details else None - self.details[method_name] = self._parse(method_name, method_data, datatype="bool", default=default) + self.details[method_name] = util.parse(self.Type, method_name, method_data, datatype="bool", default=default) elif method_name in string_details: self.details[method_name] = str(method_data) @@ -852,9 +782,9 @@ class CollectionBuilder: self.library.overlays.append(name) self.item_details[method_name] = name elif method_name == "item_refresh_delay": - self.item_details[method_name] = self._parse(method_name, method_data, datatype="int", default=0, minimum=0) + self.item_details[method_name] = util.parse(self.Type, method_name, method_data, datatype="int", default=0, minimum=0) elif method_name in item_bool_details: - if self._parse(method_name, method_data, datatype="bool", default=False): + if util.parse(self.Type, method_name, method_data, datatype="bool", default=False): self.item_details[method_name] = True elif method_name in item_false_details: self.item_details[method_name] = False @@ -871,7 +801,7 @@ class CollectionBuilder: def _radarr(self, method_name, method_data): if method_name in ["radarr_add_missing", "radarr_add_existing", "radarr_monitor", "radarr_search"]: - self.radarr_details[method_name[7:]] = self._parse(method_name, method_data, datatype="bool") + self.radarr_details[method_name[7:]] = util.parse(self.Type, method_name, method_data, datatype="bool") elif method_name == "radarr_folder": self.radarr_details["folder"] = method_data elif method_name == "radarr_availability": @@ -886,7 +816,7 @@ class CollectionBuilder: def _sonarr(self, method_name, method_data): if method_name in ["sonarr_add_missing", "sonarr_add_existing", "sonarr_season", "sonarr_search", "sonarr_cutoff_search"]: - self.sonarr_details[method_name[7:]] = self._parse(method_name, method_data, datatype="bool") + self.sonarr_details[method_name[7:]] = util.parse(self.Type, method_name, method_data, datatype="bool") elif method_name in ["sonarr_folder", "sonarr_quality", "sonarr_language"]: self.sonarr_details[method_name[7:]] = method_data elif method_name == "sonarr_monitor": @@ -904,12 +834,12 @@ class CollectionBuilder: def _anidb(self, method_name, method_data): if method_name == "anidb_popular": - self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=30, maximum=30))) + self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=30, maximum=30))) elif method_name in ["anidb_id", "anidb_relation"]: for anidb_id in self.config.AniDB.validate_anidb_ids(method_data, self.language): self.builders.append((method_name, anidb_id)) elif method_name == "anidb_tag": - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): new_dictionary = {} if "tag" not in dict_methods: raise Failed(f"{self.Type} Error: anidb_tag tag attribute is required") @@ -917,7 +847,7 @@ class CollectionBuilder: raise Failed(f"{self.Type} Error: anidb_tag tag attribute is blank") else: new_dictionary["tag"] = util.regex_first_int(dict_data[dict_methods["tag"]], "AniDB Tag ID") - new_dictionary["limit"] = self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, minimum=0) + new_dictionary["limit"] = util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name, minimum=0) self.builders.append((method_name, new_dictionary)) def _anilist(self, method_name, method_data): @@ -925,50 +855,50 @@ class CollectionBuilder: for anilist_id in self.config.AniList.validate_anilist_ids(method_data, studio=method_name == "anilist_studio"): self.builders.append((method_name, anilist_id)) elif method_name in ["anilist_popular", "anilist_trending", "anilist_top_rated"]: - self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10))) + self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10))) elif method_name == "anilist_search": if self.current_time.month in [12, 1, 2]: current_season = "winter" elif self.current_time.month in [3, 4, 5]: current_season = "spring" elif self.current_time.month in [6, 7, 8]: current_season = "summer" else: current_season = "fall" default_year = self.current_year + 1 if self.current_time.month == 12 else self.current_year - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): new_dictionary = {} for search_method, search_data in dict_data.items(): search_attr, modifier = os.path.splitext(str(search_method).lower()) if search_method not in anilist.searches: raise Failed(f"{self.Type} Error: {method_name} {search_method} attribute not supported") elif search_attr == "season": - new_dictionary[search_attr] = self._parse(search_attr, search_data, parent=method_name, default=current_season, options=util.seasons) + new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, parent=method_name, default=current_season, options=util.seasons) if "year" not in dict_methods: logger.warning(f"Collection Warning: {method_name} year attribute not found using this year: {default_year} by default") new_dictionary["year"] = default_year elif search_attr == "year": - new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="int", parent=method_name, default=default_year, minimum=1917, maximum=default_year + 1) + new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, datatype="int", parent=method_name, default=default_year, minimum=1917, maximum=default_year + 1) elif search_data is None: raise Failed(f"{self.Type} Error: {method_name} {search_method} attribute is blank") elif search_attr == "adult": - new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="bool", parent=method_name) + new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, datatype="bool", parent=method_name) elif search_attr == "country": - new_dictionary[search_attr] = self._parse(search_attr, search_data, options=anilist.country_codes, parent=method_name) + new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, options=anilist.country_codes, parent=method_name) elif search_attr == "source": - new_dictionary[search_attr] = self._parse(search_attr, search_data, options=anilist.media_source, parent=method_name) + new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, options=anilist.media_source, parent=method_name) elif search_attr in ["episodes", "duration", "score", "popularity"]: - new_dictionary[search_method] = self._parse(search_method, search_data, datatype="int", parent=method_name) + new_dictionary[search_method] = util.parse(self.Type, search_method, search_data, datatype="int", parent=method_name) elif search_attr in ["format", "status", "genre", "tag", "tag_category"]: - new_dictionary[search_method] = self.config.AniList.validate(search_attr.replace("_", " ").title(), self._parse(search_method, search_data)) + new_dictionary[search_method] = self.config.AniList.validate(search_attr.replace("_", " ").title(), util.parse(self.Type, search_method, search_data)) elif search_attr in ["start", "end"]: new_dictionary[search_method] = util.validate_date(search_data, f"{method_name} {search_method} attribute", return_as="%m/%d/%Y") elif search_attr == "min_tag_percent": - new_dictionary[search_attr] = self._parse(search_attr, search_data, datatype="int", parent=method_name, minimum=0, maximum=100) + new_dictionary[search_attr] = util.parse(self.Type, search_attr, search_data, datatype="int", parent=method_name, minimum=0, maximum=100) elif search_attr == "search": new_dictionary[search_attr] = str(search_data) elif search_method not in ["sort_by", "limit"]: raise Failed(f"{self.Type} Error: {method_name} {search_method} attribute not supported") if len(new_dictionary) == 0: raise Failed(f"{self.Type} Error: {method_name} must have at least one valid search option") - new_dictionary["sort_by"] = self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=anilist.sort_options) - new_dictionary["limit"] = self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) + new_dictionary["sort_by"] = util.parse(self.Type, "sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=anilist.sort_options) + new_dictionary["limit"] = util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) self.builders.append((method_name, new_dictionary)) def _flixpatrol(self, method_name, method_data): @@ -977,26 +907,26 @@ class CollectionBuilder: for flixpatrol_list in flixpatrol_lists: self.builders.append(("flixpatrol_url", flixpatrol_list)) elif method_name in flixpatrol.builders: - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): if method_name == "flixpatrol_demographics": data = { - "generation": self._parse("generation", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.generations), - "gender": self._parse("gender", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.gender), - "location": self._parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.demo_locations), - "limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10) + "generation": util.parse(self.Type, "generation", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.generations), + "gender": util.parse(self.Type, "gender", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.gender), + "location": util.parse(self.Type, "location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.demo_locations), + "limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10) } elif method_name == "flixpatrol_popular": data = { - "source": self._parse("source", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.popular), - "time_window": self._parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"), - "limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10) + "source": util.parse(self.Type, "source", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.popular), + "time_window": util.parse(self.Type, "time_window", dict_data, methods=dict_methods, parent=method_name, default="today"), + "limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10) } elif method_name == "flixpatrol_top": data = { - "platform": self._parse("platform", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.platforms), - "location": self._parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.locations), - "time_window": self._parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"), - "limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10) + "platform": util.parse(self.Type, "platform", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.platforms), + "location": util.parse(self.Type, "location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.locations), + "time_window": util.parse(self.Type, "time_window", dict_data, methods=dict_methods, parent=method_name, default="today"), + "limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10) } else: continue @@ -1045,50 +975,50 @@ class CollectionBuilder: for mal_id in util.get_int_list(method_data, "MyAnimeList ID"): self.builders.append((method_name, mal_id)) elif method_name in ["mal_all", "mal_airing", "mal_upcoming", "mal_tv", "mal_ova", "mal_movie", "mal_special", "mal_popular", "mal_favorite", "mal_suggested"]: - self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10, maximum=100 if method_name == "mal_suggested" else 500))) + self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10, maximum=100 if method_name == "mal_suggested" else 500))) elif method_name in ["mal_season", "mal_userlist"]: - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): if method_name == "mal_season": if self.current_time.month in [1, 2, 3]: default_season = "winter" elif self.current_time.month in [4, 5, 6]: default_season = "spring" elif self.current_time.month in [7, 8, 9]: default_season = "summer" else: default_season = "fall" self.builders.append((method_name, { - "season": self._parse("season", dict_data, methods=dict_methods, parent=method_name, default=default_season, options=util.seasons), - "sort_by": self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation), - "year": self._parse("year", dict_data, datatype="int", methods=dict_methods, default=self.current_year, parent=method_name, minimum=1917, maximum=self.current_year + 1), - "limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=500) + "season": util.parse(self.Type, "season", dict_data, methods=dict_methods, parent=method_name, default=default_season, options=util.seasons), + "sort_by": util.parse(self.Type, "sort_by", dict_data, methods=dict_methods, parent=method_name, default="members", options=mal.season_sort_options, translation=mal.season_sort_translation), + "year": util.parse(self.Type, "year", dict_data, datatype="int", methods=dict_methods, default=self.current_year, parent=method_name, minimum=1917, maximum=self.current_year + 1), + "limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=500) })) elif method_name == "mal_userlist": self.builders.append((method_name, { - "username": self._parse("username", dict_data, methods=dict_methods, parent=method_name), - "status": self._parse("status", dict_data, methods=dict_methods, parent=method_name, default="all", options=mal.userlist_status), - "sort_by": self._parse("sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation), - "limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=1000) + "username": util.parse(self.Type, "username", dict_data, methods=dict_methods, parent=method_name), + "status": util.parse(self.Type, "status", dict_data, methods=dict_methods, parent=method_name, default="all", options=mal.userlist_status), + "sort_by": util.parse(self.Type, "sort_by", dict_data, methods=dict_methods, parent=method_name, default="score", options=mal.userlist_sort_options, translation=mal.userlist_sort_translation), + "limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name, maximum=1000) })) elif method_name in ["mal_genre", "mal_studio"]: id_name = f"{method_name[4:]}_id" final_data = [] for data in util.get_list(method_data): final_data.append(data if isinstance(data, dict) else {id_name: data, "limit": 0}) - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): self.builders.append((method_name, { - id_name: self._parse(id_name, dict_data, datatype="int", methods=dict_methods, parent=method_name, maximum=999999), - "limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) + id_name: util.parse(self.Type, id_name, dict_data, datatype="int", methods=dict_methods, parent=method_name, maximum=999999), + "limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) })) def _plex(self, method_name, method_data): if method_name in ["plex_all", "plex_pilots"]: self.builders.append((method_name, self.collection_level)) elif method_name in ["plex_search", "plex_collectionless"]: - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): new_dictionary = {} if method_name == "plex_search": type_override = f"{self.collection_level}s" if self.collection_level in plex.collection_level_options else None new_dictionary = self.build_filter("plex_search", dict_data, type_override=type_override) elif method_name == "plex_collectionless": - prefix_list = self._parse("exclude_prefix", dict_data, datatype="list", methods=dict_methods) - exact_list = self._parse("exclude", dict_data, datatype="list", methods=dict_methods) + prefix_list = util.parse(self.Type, "exclude_prefix", dict_data, datatype="list", methods=dict_methods) + exact_list = util.parse(self.Type, "exclude", dict_data, datatype="list", methods=dict_methods) if len(prefix_list) == 0 and len(exact_list) == 0: raise Failed(f"{self.Type} Error: you must have at least one exclusion") exact_list.append(self.name) @@ -1099,26 +1029,26 @@ class CollectionBuilder: self.builders.append(("plex_search", self.build_filter("plex_search", {"any": {method_name: method_data}}))) def _stevenlu(self, method_name, method_data): - self.builders.append((method_name, self._parse(method_name, method_data, "bool"))) + self.builders.append((method_name, util.parse(self.Type, method_name, method_data, "bool"))) def _mdblist(self, method_name, method_data): for mdb_dict in self.config.Mdblist.validate_mdblist_lists(method_data): self.builders.append((method_name, mdb_dict)) def _tautulli(self, method_name, method_data): - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): self.builders.append((method_name, { "list_type": "popular" if method_name == "tautulli_popular" else "watched", - "list_days": self._parse("list_days", dict_data, datatype="int", methods=dict_methods, default=30, parent=method_name), - "list_size": self._parse("list_size", dict_data, datatype="int", methods=dict_methods, default=10, parent=method_name), - "list_buffer": self._parse("list_buffer", dict_data, datatype="int", methods=dict_methods, default=20, parent=method_name), - "list_minimum": self._parse("list_minimum", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) + "list_days": util.parse(self.Type, "list_days", dict_data, datatype="int", methods=dict_methods, default=30, parent=method_name), + "list_size": util.parse(self.Type, "list_size", dict_data, datatype="int", methods=dict_methods, default=10, parent=method_name), + "list_buffer": util.parse(self.Type, "list_buffer", dict_data, datatype="int", methods=dict_methods, default=20, parent=method_name), + "list_minimum": util.parse(self.Type, "list_minimum", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) })) def _tmdb(self, method_name, method_data): if method_name == "tmdb_discover": - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): - new_dictionary = {"limit": self._parse("limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name)} + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): + new_dictionary = {"limit": util.parse(self.Type, "limit", dict_data, datatype="int", methods=dict_methods, default=100, parent=method_name)} for discover_method, discover_data in dict_data.items(): discover_attr, modifier = os.path.splitext(str(discover_method).lower()) if discover_data is None: @@ -1131,10 +1061,10 @@ class CollectionBuilder: raise Failed(f"{self.Type} Error: {method_name} {discover_method} attribute only works for movie libraries") elif discover_attr in ["language", "region"]: regex = ("([a-z]{2})-([A-Z]{2})", "en-US") if discover_attr == "language" else ("^[A-Z]{2}$", "US") - new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, parent=method_name, regex=regex) + new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, parent=method_name, regex=regex) elif discover_attr == "sort_by": options = tmdb.discover_movie_sort if self.library.is_movie else tmdb.discover_tv_sort - new_dictionary[discover_method] = self._parse(discover_attr, discover_data, parent=method_name, options=options) + new_dictionary[discover_method] = util.parse(self.Type, discover_attr, discover_data, parent=method_name, options=options) elif discover_attr == "certification_country": if "certification" in dict_data or "certification.lte" in dict_data or "certification.gte" in dict_data: new_dictionary[discover_method] = discover_data @@ -1152,23 +1082,23 @@ class CollectionBuilder: raise Failed(f"{self.Type} Error: {method_name} {discover_method} attribute: must be used with either with_watch_providers, without_watch_providers, or with_watch_monetization_types") elif discover_attr == "with_watch_monetization_types": if "watch_region" in dict_data: - new_dictionary[discover_method] = self._parse(discover_attr, discover_data, parent=method_name, options=tmdb.discover_monetization_types) + new_dictionary[discover_method] = util.parse(self.Type, discover_attr, discover_data, parent=method_name, options=tmdb.discover_monetization_types) else: raise Failed(f"{self.Type} Error: {method_name} {discover_method} attribute: must be used with watch_region") elif discover_attr in tmdb.discover_booleans: - new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="bool", parent=method_name) + new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="bool", parent=method_name) elif discover_attr == "vote_average": - new_dictionary[discover_method] = self._parse(discover_method, discover_data, datatype="float", parent=method_name) + new_dictionary[discover_method] = util.parse(self.Type, discover_method, discover_data, datatype="float", parent=method_name) elif discover_attr == "with_status": - new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=5) + new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=5) elif discover_attr == "with_type": - new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=6) + new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="int", parent=method_name, minimum=0, maximum=6) elif discover_method in tmdb.discover_dates: new_dictionary[discover_method] = util.validate_date(discover_data, f"{method_name} {discover_method} attribute", return_as="%m/%d/%Y") elif discover_attr in tmdb.discover_years: - new_dictionary[discover_attr] = self._parse(discover_attr, discover_data, datatype="int", parent=method_name, minimum=1800, maximum=self.current_year + 1) + new_dictionary[discover_attr] = util.parse(self.Type, discover_attr, discover_data, datatype="int", parent=method_name, minimum=1800, maximum=self.current_year + 1) elif discover_attr in tmdb.discover_ints: - new_dictionary[discover_method] = self._parse(discover_method, discover_data, datatype="int", parent=method_name) + new_dictionary[discover_method] = util.parse(self.Type, discover_method, discover_data, datatype="int", parent=method_name) elif discover_method in tmdb.discover_strings: new_dictionary[discover_method] = discover_data elif discover_attr != "limit": @@ -1178,7 +1108,7 @@ class CollectionBuilder: else: raise Failed(f"{self.Type} Error: {method_name} had no valid fields") elif method_name in ["tmdb_popular", "tmdb_top_rated", "tmdb_now_playing", "tmdb_trending_daily", "tmdb_trending_weekly"]: - self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10))) + self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10))) else: values = self.config.TMDb.validate_tmdb_ids(method_data, method_name) if method_name.endswith("_details"): @@ -1214,12 +1144,12 @@ class CollectionBuilder: for trakt_list in self.config.Trakt.validate_trakt(method_data, self.library.is_movie, trakt_type=method_name[6:]): self.builders.append((method_name, trakt_list)) elif method_name == "trakt_boxoffice": - if self._parse(method_name, method_data, datatype="bool", default=False): + if util.parse(self.Type, method_name, method_data, datatype="bool", default=False): self.builders.append((method_name, 10)) else: raise Failed(f"{self.Type} Error: {method_name} must be set to true") elif method_name in trakt.builders: - self.builders.append((method_name, self._parse(method_name, method_data, datatype="int", default=10))) + self.builders.append((method_name, util.parse(self.Type, method_name, method_data, datatype="int", default=10))) def _tvdb(self, method_name, method_data): values = util.get_list(method_data) @@ -1236,7 +1166,7 @@ class CollectionBuilder: self.builders.append((method_name[:-8] if method_name.endswith("_details") else method_name, value)) def _filters(self, method_name, method_data): - for dict_data, dict_methods in self._parse(method_name, method_data, datatype="dictlist"): + for dict_data, dict_methods in util.parse(self.Type, method_name, method_data, datatype="dictlist"): validate = True if "validate" in dict_data: if dict_data["validate"] is None: @@ -1651,7 +1581,7 @@ class CollectionBuilder: return util.get_list(data) elif attribute == "history": try: - return self._parse(final, data, datatype="int", maximum=30) + return util.parse(self.Type, final, data, datatype="int", maximum=30) except Failed: if str(data).lower() in ["day", "month"]: return data.lower() @@ -1691,15 +1621,15 @@ class CollectionBuilder: final_years = [] values = util.get_list(data) for value in values: - final_years.append(self._parse(final, value, datatype="int")) + final_years.append(util.parse(self.Type, final, value, datatype="int")) return smart_pair(final_years) elif (attribute in plex.number_attributes + plex.date_attributes + plex.year_attributes + ["tmdb_year"] and modifier in ["", ".not", ".gt", ".gte", ".lt", ".lte"]) \ or (attribute in plex.tag_attributes and modifier in [".count_gt", ".count_gte", ".count_lt", ".count_lte"]): - return self._parse(final, data, datatype="int") + return util.parse(self.Type, final, data, datatype="int") elif attribute in plex.float_attributes and modifier in [".gt", ".gte", ".lt", ".lte"]: - return self._parse(final, data, datatype="float", minimum=0, maximum=10) + return util.parse(self.Type, final, data, datatype="float", minimum=0, maximum=10) elif attribute in plex.boolean_attributes + boolean_filters: - return self._parse(attribute, data, datatype="bool") + return util.parse(self.Type, attribute, data, datatype="bool") else: raise Failed(f"{self.Type} Error: {final} attribute not supported") diff --git a/modules/config.py b/modules/config.py index 320090ad..cab8cc4c 100644 --- a/modules/config.py +++ b/modules/config.py @@ -566,6 +566,7 @@ class ConfigFile: util.separator(f"{display_name} Configuration") logger.info("") logger.info(f"Connecting to {display_name} Library...") + logger.info("") params["asset_directory"] = check_for_attribute(lib, "asset_directory", parent="settings", var_type="list_path", default=self.general["asset_directory"], default_is_none=True, save=False) if params["asset_directory"] is None: @@ -772,6 +773,9 @@ class ConfigFile: except NotScheduled: params["skip_library"] = True + logger.info("") + util.separator("Plex Configuration", space=False, border=False) + logger.info("") params["plex"] = { "url": check_for_attribute(lib, "url", parent="plex", var_type="url", default=self.general["plex"]["url"], req_default=True, save=False), "token": check_for_attribute(lib, "token", parent="plex", default=self.general["plex"]["token"], req_default=True, save=False), @@ -789,6 +793,18 @@ class ConfigFile: logger.info("") logger.info(f"{display_name} Library Connection Failed") continue + try: + logger.info("") + util.separator("Scaning Metadata Files", space=False, border=False) + logger.info("") + library.scan_metadata_files() + except Failed as e: + self.errors.append(e) + util.print_stacktrace() + util.print_multiline(e, error=True) + logger.info("") + logger.info(f"{display_name} Metadata Failed to Load") + continue if self.general["radarr"]["url"] or (lib and "radarr" in lib): logger.info("") diff --git a/modules/library.py b/modules/library.py index 465337a1..409e262b 100644 --- a/modules/library.py +++ b/modules/library.py @@ -95,6 +95,17 @@ class Library(ABC): self.library_operation = self.items_library_operation or self.delete_unmanaged_collections or self.delete_collections_with_less \ or self.radarr_remove_by_tag or self.sonarr_remove_by_tag or self.mass_collection_mode \ or self.genre_collections or self.show_unmanaged or self.metadata_backup + + if self.asset_directory: + logger.info("") + for ad in self.asset_directory: + logger.info(f"Using Asset Directory: {ad}") + + if output: + logger.info("") + logger.info(output) + + def scan_metadata_files(self): metadata = [] for file_type, metadata_file in self.metadata_path: if file_type == "Folder": @@ -110,7 +121,7 @@ class Library(ABC): metadata.append((file_type, metadata_file)) for file_type, metadata_file in metadata: try: - meta_obj = MetadataFile(config, self, file_type, metadata_file) + meta_obj = MetadataFile(self.config, self, file_type, metadata_file) if meta_obj.collections: self.collections.extend([c for c in meta_obj.collections]) if meta_obj.metadata: @@ -123,15 +134,6 @@ class Library(ABC): logger.info("") raise Failed("Config Error: No valid metadata files, playlist files, or library operations found") - if self.asset_directory: - logger.info("") - for ad in self.asset_directory: - logger.info(f"Using Asset Directory: {ad}") - - if output: - logger.info("") - logger.info(output) - def upload_images(self, item, poster=None, background=None, overlay=None): image = None image_compare = None diff --git a/modules/meta.py b/modules/meta.py index 09e33ede..746447cd 100644 --- a/modules/meta.py +++ b/modules/meta.py @@ -30,7 +30,7 @@ def get_dict(attribute, attr_data, check_list=None): logger.error(f"Config Error: {attribute} must be a dictionary") else: logger.error(f"Config Error: {attribute} attribute is blank") - return None + return {} class DataFile: @@ -224,10 +224,80 @@ class MetadataFile(DataFile): else: logger.info("") logger.info(f"Loading Metadata {file_type}: {path}") + logger.info("") data = self.load_file() self.metadata = get_dict("metadata", data, library.metadata_files) self.templates = get_dict("templates", data) self.collections = get_dict("collections", data, library.collections) + col_names = library.collections + [c for c in self.collections] + for auto_name, auto_data in get_dict("auto_collections", data).items(): + try: + auto_methods = {dm.lower(): dm for dm in auto_data} + if "auto_list" not in auto_methods: + raise Failed(f"Config Error: {auto_name}'s auto_list attribute not found") + elif not auto_data[auto_methods["auto_list"]]: + raise Failed(f"Config Error: {auto_name}'s auto_list attribute is blank") + elif auto_data[auto_methods["auto_list"]] not in ["genre", "tmdb_collection"]: + raise Failed(f"Config Error: {auto_name}'s auto_list attribute {auto_data[auto_methods['auto_list']]} invalid") + else: + auto_type = auto_data[auto_methods["auto_list"]] + exclude = util.parse("Config", "exclude", auto_data, methods=auto_methods, datatype="list") if "exclude" in auto_methods else [] + if auto_type == "genre": + auto_list = {genre: genre for genre in library.get_genres() if genre not in exclude} + default_template = {"smart_filter": {"limit": 50, "sort_by": "critic_rating.desc", "all": {"genre": "<>"}}} + default_title_format = "Top <> <<library_type>>s" + elif auto_type == "tmdb_collection": + auto_list = {} + items = library.get_all() + for i, item in enumerate(items, 1): + util.print_return(f"Processing: {i}/{len(items)} {item.title}") + tmdb_id, tvdb_id, imdb_id = library.get_ids(item) + tmdb_item = config.TMDb.get_item(item, tmdb_id, tvdb_id, imdb_id, is_movie=True) + if tmdb_item and tmdb_item.collection and tmdb_item.collection.id not in exclude and tmdb_item.collection.name not in exclude: + auto_list[tmdb_item.collection.id] = tmdb_item.collection.name + util.print_end() + default_template = {"tmdb_collection_details": "<<tmdb_collection>>"} + default_title_format = "<<title>>" + else: + raise Failed + title_format = util.parse("Config", "title_format", auto_data, methods=auto_methods, default=default_title_format) + + if "<<title>>" not in title_format: + logger.error(f"Config Error: <<title>> not in title_format: {title_format} using default: {default_title_format}") + title_format = default_title_format + if "<<library_type>>" in title_format: + title_format = title_format.replace("<<library_type>>", library.type) + dictionary_variables = util.parse("Config", "dictionary_variables", auto_data, methods=auto_methods, datatype="dictdict") if "dictionary_variables" in auto_methods else {} + template_name = util.parse("Config", "template", auto_data, methods=auto_methods) + if template_name: + if template_name not in self.templates: + raise Failed(f"Config Error: template: {template_name} not found") + if f"<<{auto_type}>>" not in str(self.templates[template_name]): + raise Failed(f"Config Error: template: {template_name} is required to have the template variable <<{auto_type}>>") + else: + self.templates[auto_name] = default_template + template_name = auto_name + remove_prefix = util.parse("Config", "remove_prefix", auto_data, methods=auto_methods, datatype="commalist") if "remove_prefix" in auto_methods else [] + remove_suffix = util.parse("Config", "remove_suffix", auto_data, methods=auto_methods, datatype="commalist") if "remove_suffix" in auto_methods else [] + for key, value in auto_list.items(): + template_call = {"name": template_name, auto_type: key} + for k, v in dictionary_variables.items(): + if key in v: + template_call[k] = v[key] + for prefix in remove_prefix: + if value.startswith(prefix): + value = value[len(prefix):].strip() + for suffix in remove_suffix: + if value.endswith(suffix): + value = value[:-len(suffix)].strip() + collection_title = title_format.replace("<<title>>", value) + if collection_title in col_names: + logger.warning(f"Config Warning: Skipping duplicate collection: {collection_title}") + else: + self.collections[collection_title] = {"template": template_call} + except Failed as e: + logger.error(e) + continue if not self.metadata and not self.collections: raise Failed("YAML Error: metadata or collections attribute is required") diff --git a/modules/plex.py b/modules/plex.py index 460ab9cb..d31bbb81 100644 --- a/modules/plex.py +++ b/modules/plex.py @@ -383,7 +383,6 @@ class Plex(Library): self.url = params["plex"]["url"] self.token = params["plex"]["token"] self.timeout = params["plex"]["timeout"] - logger.info("") try: self.PlexServer = PlexServer(baseurl=self.url, token=self.token, session=self.config.session, timeout=self.timeout) except Unauthorized: diff --git a/modules/util.py b/modules/util.py index 51984ea8..96be07ae 100644 --- a/modules/util.py +++ b/modules/util.py @@ -474,3 +474,84 @@ def schedule_check(attribute, data, current_time, run_hour): raise NotScheduledRange(schedule_str) elif skip_collection: raise NotScheduled(schedule_str) + +def parse(error, attribute, data, datatype=None, methods=None, parent=None, default=None, options=None, translation=None, minimum=1, maximum=None, regex=None): + display = f"{parent + ' ' if parent else ''}{attribute} attribute" + if options is None and translation is not None: + options = [o for o in translation] + value = data[methods[attribute]] if methods and attribute in methods else data + + if datatype in ["list", "commalist"]: + if value: + if datatype == "commalist": + value = get_list(value) + return [v for v in value if v] if isinstance(value, list) else [str(value)] + return [] + elif datatype == "intlist": + if value: + try: + return [int(v) for v in value if v] if isinstance(value, list) else [int(value)] + except ValueError: + pass + return [] + elif datatype == "dictlist": + final_list = [] + for dict_data in get_list(value): + if isinstance(dict_data, dict): + final_list.append((dict_data, {dm.lower(): dm for dm in dict_data})) + else: + raise Failed(f"{error} Error: {display} {dict_data} is not a dictionary") + return final_list + elif datatype == "dictdict": + final_dict = {} + if isinstance(value, dict): + for dict_key, dict_data in value.items(): + if isinstance(dict_data, dict) and dict_data: + final_dict[dict_key] = dict_data + else: + raise Failed(f"{error} Warning: {display} {dict_key} is not a dictionary") + return final_dict + elif methods and attribute not in methods: + message = f"{display} not found" + elif value is None: + message = f"{display} is blank" + elif regex is not None: + regex_str, example = regex + if re.compile(regex_str).match(str(value)): + return str(value) + else: + message = f"{display}: {value} must match pattern {regex_str} e.g. {example}" + elif datatype == "bool": + if isinstance(value, bool): + return value + elif isinstance(value, (int, float)): + return value > 0 + elif str(value).lower() in ["t", "true"]: + return True + elif str(value).lower() in ["f", "false"]: + return False + else: + message = f"{display} must be either true or false" + elif datatype in ["int", "float"]: + try: + value = int(str(value)) if datatype == "int" else float(str(value)) + if (maximum is None and minimum <= value) or (maximum is not None and minimum <= value <= maximum): + return value + except ValueError: + pass + pre = f"{display} {value} must be {'an integer' if datatype == 'int' else 'a number'}" + if maximum is None: + message = f"{pre} {minimum} or greater" + else: + message = f"{pre} between {minimum} and {maximum}" + elif (translation is not None and str(value).lower() not in translation) or \ + (options is not None and translation is None and str(value).lower() not in options): + message = f"{display} {value} must be in {', '.join([str(o) for o in options])}" + else: + return translation[value] if translation is not None else value + + if default is None: + raise Failed(f"{error} Error: {message}") + else: + logger.warning(f"{error} Warning: {message} using {default} as default") + return translation[default] if translation is not None else default