Merge pull request #138 from meisnate12/develop

v1.6.0
pull/144/head v1.6.0
meisnate12 4 years ago committed by GitHub
commit 92c950e046
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,5 +1,5 @@
# Plex Meta Manager
#### Version 1.5.1
#### Version 1.6.0
The original concept for Plex Meta Manager is [Plex Auto Collections](https://github.com/mza921/Plex-Auto-Collections), but this is rewritten from the ground up to be able to include a scheduler, metadata edits, multiple libraries, and logging. Plex Meta Manager is a Python 3 script that can be continuously run using YAML configuration files to update on a schedule the metadata of the movies, shows, and collections in your libraries as well as automatically build collections based on various methods all detailed in the wiki. Some collection examples that the script can automatically build and update daily include Plex Based Searches like actor, genre, or studio collections or Collections based on TMDb, IMDb, Trakt, TVDb, AniDB, or MyAnimeList lists and various other services.
@ -17,7 +17,8 @@ The script is designed to work with most Metadata agents including the new Plex
## Support
* If you're getting an Error or have an Enhancement post in the [Issues](https://github.com/meisnate12/Plex-Meta-Manager/issues)
* Before posting on Github about an enhancement, error, or configuration question please visit the [Plex Meta Manager Discord Server](https://discord.gg/NfH6mGFuAB)
* If you're getting an error or have an enhancement post in the [Issues](https://github.com/meisnate12/Plex-Meta-Manager/issues)
* If you have a configuration question visit the [Discussions](https://github.com/meisnate12/Plex-Meta-Manager/discussions)
* To see user submitted Metadata configuration files and you could even add your own go to the [Plex Meta Manager Configs](https://github.com/meisnate12/Plex-Meta-Manager-Configs)
* Pull Request are welcome but please submit them to the develop branch

@ -9,6 +9,13 @@ class AniListAPI:
def __init__(self, config):
self.config = config
self.url = "https://graphql.anilist.co"
self.tags = {}
self.genres = {}
for tag in self.send_request("query{MediaTagCollection {name}}", {})["data"]["MediaTagCollection"]:
self.tags[tag["name"].lower()] = tag["name"]
for genre in self.send_request("query{GenreCollection}", {})["data"]["GenreCollection"]:
self.genres[genre.lower()] = genre
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def post(self, query, variables):
@ -56,7 +63,6 @@ class AniListAPI:
break
if 0 < limit == count:
break
return mal_ids
def top_rated(self, limit):
@ -93,6 +99,30 @@ class AniListAPI:
variables = {"season": season.upper(), "year": year, "sort": "SCORE_DESC" if sort == "score" else "POPULARITY_DESC"}
return self.get_pagenation(query, limit=limit, variables=variables)
def genre(self, genre, sort, limit):
query = """
query ($page: Int, $genre: String, $sort: [MediaSort]) {
Page(page: $page){
pageInfo {hasNextPage}
media(genre: $genre, sort: $sort){idMal}
}
}
"""
variables = {"genre": genre, "sort": "SCORE_DESC" if sort == "score" else "POPULARITY_DESC"}
return self.get_pagenation(query, limit=limit, variables=variables)
def tag(self, tag, sort, limit):
query = """
query ($page: Int, $tag: String, $sort: [MediaSort]) {
Page(page: $page){
pageInfo {hasNextPage}
media(tag: $tag, sort: $sort){idMal}
}
}
"""
variables = {"tag": tag, "sort": "SCORE_DESC" if sort == "score" else "POPULARITY_DESC"}
return self.get_pagenation(query, limit=limit, variables=variables)
def studio(self, studio_id):
query = """
query ($page: Int, $id: Int) {
@ -154,6 +184,16 @@ class AniListAPI:
return mal_ids, ignore_ids, name
def validate_genre(self, genre):
if genre.lower() in self.genres:
return self.genres[genre.lower()]
raise Failed(f"AniList Error: Genre: {genre} does not exist")
def validate_tag(self, tag):
if tag.lower() in self.tags:
return self.tags[tag.lower()]
raise Failed(f"AniList Error: Tag: {tag} does not exist")
def validate_anilist_ids(self, anilist_ids, studio=False):
anilist_values = []
for anilist_id in anilist_ids:
@ -183,7 +223,15 @@ class AniListAPI:
elif method == "anilist_season":
mal_ids = self.season(data["season"], data["year"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit']} Anime from {util.pretty_seasons[data['season']]} {data['year']} sorted by {util.anilist_pretty[data['sort_by']]}")
logger.info(f"Processing {pretty}: {data['limit'] if data['limit'] > 0 else 'All'} Anime from {util.pretty_seasons[data['season']]} {data['year']} sorted by {util.anilist_pretty[data['sort_by']]}")
elif method == "anilist_genre":
mal_ids = self.genre(data["genre"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit'] if data['limit'] > 0 else 'All'} Anime from the Genre: {data['genre']} sorted by {util.anilist_pretty[data['sort_by']]}")
elif method == "anilist_tag":
mal_ids = self.tag(data["tag"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit'] if data['limit'] > 0 else 'All'} Anime from the Tag: {data['tag']} sorted by {util.anilist_pretty[data['sort_by']]}")
elif method in ["anilist_studio", "anilist_relations"]:
if method == "anilist_studio": mal_ids, name = self.studio(data)
else: mal_ids, _, name = self.relations(data)

@ -31,13 +31,15 @@ class CollectionBuilder:
current_time = datetime.now()
current_year = current_time.year
if "template" in data:
methods = {m.lower(): m for m in self.data}
if "template" in methods:
if not self.library.templates:
raise Failed("Collection Error: No templates found")
elif not data["template"]:
elif not self.data[methods["template"]]:
raise Failed("Collection Error: template attribute is blank")
else:
for data_template in util.get_list(data["template"], split=False):
for data_template in util.get_list(self.data[methods["template"]], split=False):
if not isinstance(data_template, dict):
raise Failed("Collection Error: template attribute is not a dictionary")
elif "name" not in data_template:
@ -84,9 +86,9 @@ class CollectionBuilder:
else:
raise Failed("Collection Error: template sub-attribute optional is blank")
for m in template:
if m not in self.data and m not in ["default", "optional"]:
if template[m]:
for method_name, attr_data in template:
if method_name not in self.data and method_name not in ["default", "optional"]:
if attr_data:
def replace_txt(txt):
txt = str(txt)
for option in optional:
@ -106,42 +108,42 @@ class CollectionBuilder:
try: return int(txt)
except ValueError: return txt
try:
if isinstance(template[m], dict):
attr = {}
for sm in template[m]:
if isinstance(template[m][sm], list):
if isinstance(attr_data, dict):
discover_name = {}
for sm in attr_data:
if isinstance(attr_data[sm], list):
temp_list = []
for li in template[m][sm]:
for li in attr_data[sm]:
temp_list.append(replace_txt(li))
attr[sm] = temp_list
discover_name[sm] = temp_list
else:
attr[sm] = replace_txt(template[m][sm])
elif isinstance(template[m], list):
attr = []
for li in template[m]:
discover_name[sm] = replace_txt(attr_data[sm])
elif isinstance(attr_data, list):
discover_name = []
for li in attr_data:
if isinstance(li, dict):
temp_dict = {}
for sm in li:
temp_dict[sm] = replace_txt(li[sm])
attr.append(temp_dict)
discover_name.append(temp_dict)
else:
attr.append(replace_txt(li))
discover_name.append(replace_txt(li))
else:
attr = replace_txt(template[m])
discover_name = replace_txt(attr_data)
except Failed:
continue
self.data[m] = attr
self.data[method_name] = discover_name
else:
raise Failed(f"Collection Error: template attribute {m} is blank")
raise Failed(f"Collection Error: template attribute {method_name} is blank")
skip_collection = True
if "schedule" not in data:
if "schedule" not in methods:
skip_collection = False
elif not data["schedule"]:
elif not self.data[methods["schedule"]]:
logger.error("Collection Error: schedule attribute is blank. Running daily")
skip_collection = False
else:
schedule_list = util.get_list(data["schedule"])
schedule_list = util.get_list(self.data[methods["schedule"]])
next_month = current_time.replace(day=28) + timedelta(days=4)
last_day = next_month - timedelta(days=next_month.day)
for schedule in schedule_list:
@ -191,13 +193,13 @@ class CollectionBuilder:
logger.info(f"Scanning {self.name} Collection")
self.collectionless = "plex_collectionless" in data
self.run_again = "run_again" in data
self.collectionless = "plex_collectionless" in methods
self.run_again = "run_again" in methods
if "tmdb_person" in data:
if data["tmdb_person"]:
if "tmdb_person" in methods:
if self.data[methods["tmdb_person"]]:
valid_names = []
for tmdb_id in util.get_int_list(data["tmdb_person"], "TMDb Person ID"):
for tmdb_id in util.get_int_list(self.data[methods["tmdb_person"]], "TMDb Person ID"):
person = config.TMDb.get_person(tmdb_id)
valid_names.append(person.name)
if hasattr(person, "biography") and person.biography:
@ -205,25 +207,25 @@ class CollectionBuilder:
if hasattr(person, "profile_path") and person.profile_path:
self.posters["tmdb_person"] = f"{config.TMDb.image_url}{person.profile_path}"
if len(valid_names) > 0: self.details["tmdb_person"] = valid_names
else: raise Failed(f"Collection Error: No valid TMDb Person IDs in {data['tmdb_person']}")
else: raise Failed(f"Collection Error: No valid TMDb Person IDs in {self.data[methods['tmdb_person']]}")
else:
raise Failed("Collection Error: tmdb_person attribute is blank")
for m in data:
if "tmdb" in m and not config.TMDb: raise Failed(f"Collection Error: {m} requires TMDb to be configured")
elif "trakt" in m and not config.Trakt: raise Failed(f"Collection Error: {m} requires Trakt todo be configured")
elif "imdb" in m and not config.IMDb: raise Failed(f"Collection Error: {m} requires TMDb or Trakt to be configured")
elif "tautulli" in m and not self.library.Tautulli: raise Failed(f"Collection Error: {m} requires Tautulli to be configured")
elif "mal" in m and not config.MyAnimeList: raise Failed(f"Collection Error: {m} requires MyAnimeList to be configured")
elif data[m] is not None:
for method_name, method_data in self.data.items():
if "tmdb" in method_name.lower() and not config.TMDb: raise Failed(f"Collection Error: {method_name} requires TMDb to be configured")
elif "trakt" in method_name.lower() and not config.Trakt: raise Failed(f"Collection Error: {method_name} requires Trakt todo be configured")
elif "imdb" in method_name.lower() and not config.IMDb: raise Failed(f"Collection Error: {method_name} requires TMDb or Trakt to be configured")
elif "tautulli" in method_name.lower() and not self.library.Tautulli: raise Failed(f"Collection Error: {method_name} requires Tautulli to be configured")
elif "mal" in method_name.lower() and not config.MyAnimeList: raise Failed(f"Collection Error: {method_name} requires MyAnimeList to be configured")
elif method_data is not None:
logger.debug("")
logger.debug(f"Method: {m}")
logger.debug(f"Value: {data[m]}")
if m in util.method_alias:
method_name = util.method_alias[m]
logger.warning(f"Collection Warning: {m} attribute will run as {method_name}")
logger.debug(f"Method: {method_name}")
logger.debug(f"Value: {method_data}")
if method_name.lower() in util.method_alias:
method_name = util.method_alias[method_name.lower()]
logger.warning(f"Collection Warning: {method_name} attribute will run as {method_name}")
else:
method_name = m
method_name = method_name.lower()
if method_name in util.show_only_lists and self.library.is_movie:
raise Failed(f"Collection Error: {method_name} attribute only works for show libraries")
elif method_name in util.movie_only_lists and self.library.is_show:
@ -233,344 +235,464 @@ class CollectionBuilder:
elif method_name not in util.collectionless_lists and self.collectionless:
raise Failed(f"Collection Error: {method_name} attribute does not work for Collectionless collection")
elif method_name == "summary":
self.summaries[method_name] = data[m]
self.summaries[method_name] = method_data
elif method_name == "tmdb_summary":
self.summaries[method_name] = config.TMDb.get_movie_show_or_collection(util.regex_first_int(data[m], "TMDb ID"), self.library.is_movie).overview
self.summaries[method_name] = config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, "TMDb ID"), self.library.is_movie).overview
elif method_name == "tmdb_description":
self.summaries[method_name] = config.TMDb.get_list(util.regex_first_int(data[m], "TMDb List ID")).description
self.summaries[method_name] = config.TMDb.get_list(util.regex_first_int(method_data, "TMDb List ID")).description
elif method_name == "tmdb_biography":
self.summaries[method_name] = config.TMDb.get_person(util.regex_first_int(data[m], "TMDb Person ID")).biography
self.summaries[method_name] = config.TMDb.get_person(util.regex_first_int(method_data, "TMDb Person ID")).biography
elif method_name == "tvdb_summary":
self.summaries[method_name] = config.TVDb.get_movie_or_show(data[m], self.library.Plex.language, self.library.is_movie).summary
self.summaries[method_name] = config.TVDb.get_movie_or_show(method_data, self.library.Plex.language, self.library.is_movie).summary
elif method_name == "tvdb_description":
self.summaries[method_name] = config.TVDb.get_list_description(data[m], self.library.Plex.language)
self.summaries[method_name] = config.TVDb.get_list_description(method_data, self.library.Plex.language)
elif method_name == "trakt_description":
self.summaries[method_name] = config.Trakt.standard_list(config.Trakt.validate_trakt_list(util.get_list(data[m]))[0]).description
self.summaries[method_name] = config.Trakt.standard_list(config.Trakt.validate_trakt_list(util.get_list(method_data))[0]).description
elif method_name == "letterboxd_description":
self.summaries[method_name] = config.Letterboxd.get_list_description(data[m], self.library.Plex.language)
self.summaries[method_name] = config.Letterboxd.get_list_description(method_data, self.library.Plex.language)
elif method_name == "collection_mode":
if data[m] in ["default", "hide", "hide_items", "show_items", "hideItems", "showItems"]:
if data[m] == "hide_items": self.details[method_name] = "hideItems"
elif data[m] == "show_items": self.details[method_name] = "showItems"
else: self.details[method_name] = data[m]
if str(method_data).lower() == "default":
self.details[method_name] = "default"
elif str(method_data).lower() == "hide":
self.details[method_name] = "hide"
elif str(method_data).lower() in ["hide_items", "hideitems"]:
self.details[method_name] = "hideItems"
elif str(method_data).lower() in ["show_items", "showitems"]:
self.details[method_name] = "showItems"
else:
raise Failed(f"Collection Error: {data[m]} collection_mode Invalid\n| \tdefault (Library default)\n| \thide (Hide Collection)\n| \thide_items (Hide Items in this Collection)\n| \tshow_items (Show this Collection and its Items)")
raise Failed(f"Collection Error: {method_data} collection_mode invalid\n\tdefault (Library default)\n\thide (Hide Collection)\n\thide_items (Hide Items in this Collection)\n\tshow_items (Show this Collection and its Items)")
elif method_name == "collection_order":
if data[m] in ["release", "alpha"]:
self.details[method_name] = data[m]
if str(method_data).lower() == "release":
self.details[method_name] = "release"
elif str(method_data).lower() == "alpha":
self.details[method_name] = "release"
else:
raise Failed(f"Collection Error: {data[m]} collection_order Invalid\n| \trelease (Order Collection by release dates)\n| \talpha (Order Collection Alphabetically)")
raise Failed(f"Collection Error: {method_data} collection_order invalid\n\trelease (Order Collection by release dates)\n\talpha (Order Collection Alphabetically)")
elif method_name == "url_poster":
self.posters[method_name] = data[m]
self.posters[method_name] = method_data
elif method_name == "tmdb_poster":
self.posters[method_name] = f"{config.TMDb.image_url}{config.TMDb.get_movie_show_or_collection(util.regex_first_int(data[m], 'TMDb ID'), self.library.is_movie).poster_path}"
self.posters[method_name] = f"{config.TMDb.image_url}{config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, 'TMDb ID'), self.library.is_movie).poster_path}"
elif method_name == "tmdb_profile":
self.posters[method_name] = f"{config.TMDb.image_url}{config.TMDb.get_person(util.regex_first_int(data[m], 'TMDb Person ID')).profile_path}"
self.posters[method_name] = f"{config.TMDb.image_url}{config.TMDb.get_person(util.regex_first_int(method_data, 'TMDb Person ID')).profile_path}"
elif method_name == "tvdb_poster":
self.posters[method_name] = f"{config.TVDb.get_movie_or_series(data[m], self.library.Plex.language, self.library.is_movie).poster_path}"
self.posters[method_name] = f"{config.TVDb.get_movie_or_series(method_data, self.library.Plex.language, self.library.is_movie).poster_path}"
elif method_name == "file_poster":
if os.path.exists(data[m]): self.posters[method_name] = os.path.abspath(data[m])
else: raise Failed(f"Collection Error: Poster Path Does Not Exist: {os.path.abspath(data[m])}")
if os.path.exists(method_data):
self.posters[method_name] = os.path.abspath(method_data)
else:
raise Failed(f"Collection Error: Poster Path Does Not Exist: {os.path.abspath(method_data)}")
elif method_name == "url_background":
self.backgrounds[method_name] = data[m]
self.backgrounds[method_name] = method_data
elif method_name == "tmdb_background":
self.backgrounds[method_name] = f"{config.TMDb.image_url}{config.TMDb.get_movie_show_or_collection(util.regex_first_int(data[m], 'TMDb ID'), self.library.is_movie).poster_path}"
self.backgrounds[method_name] = f"{config.TMDb.image_url}{config.TMDb.get_movie_show_or_collection(util.regex_first_int(method_data, 'TMDb ID'), self.library.is_movie).poster_path}"
elif method_name == "tvdb_background":
self.posters[method_name] = f"{config.TVDb.get_movie_or_series(data[m], self.library.Plex.language, self.library.is_movie).background_path}"
self.posters[method_name] = f"{config.TVDb.get_movie_or_series(method_data, self.library.Plex.language, self.library.is_movie).background_path}"
elif method_name == "file_background":
if os.path.exists(data[m]): self.backgrounds[method_name] = os.path.abspath(data[m])
else: raise Failed(f"Collection Error: Background Path Does Not Exist: {os.path.abspath(data[m])}")
if os.path.exists(method_data): self.backgrounds[method_name] = os.path.abspath(method_data)
else: raise Failed(f"Collection Error: Background Path Does Not Exist: {os.path.abspath(method_data)}")
elif method_name == "label_sync_mode":
if data[m] in ["append", "sync"]: self.details[method_name] = data[m]
if str(method_data).lower() in ["append", "sync"]: self.details[method_name] = method_data.lower()
else: raise Failed("Collection Error: label_sync_mode attribute must be either 'append' or 'sync'")
elif method_name == "sync_mode":
if data[m] in ["append", "sync"]: self.details[method_name] = data[m]
if str(method_data).lower() in ["append", "sync"]: self.details[method_name] = method_data.lower()
else: raise Failed("Collection Error: sync_mode attribute must be either 'append' or 'sync'")
elif method_name in ["arr_tag", "label"]:
self.details[method_name] = util.get_list(data[m])
self.details[method_name] = util.get_list(method_data)
elif method_name in util.boolean_details:
if isinstance(data[m], bool): self.details[method_name] = data[m]
if isinstance(method_data, bool): self.details[method_name] = method_data
elif str(method_data).lower() in ["t", "true"]: self.details[method_name] = True
elif str(method_data).lower() in ["f", "false"]: self.details[method_name] = False
else: raise Failed(f"Collection Error: {method_name} attribute must be either true or false")
elif method_name in util.all_details:
self.details[method_name] = data[m]
self.details[method_name] = method_data
elif method_name in ["title", "title.and", "title.not", "title.begins", "title.ends"]:
self.methods.append(("plex_search", [{method_name: util.get_list(method_data, split=False)}]))
elif method_name in ["decade", "year.greater", "year.less"]:
self.methods.append(("plex_search", [{method_name: util.check_year(method_data, current_year, method_name)}]))
elif method_name in ["added.before", "added.after", "originally_available.before", "originally_available.after"]:
self.methods.append(("plex_search", [{method_name: util.check_date(method_data, method_name, return_string=True, plex_date=True)}]))
elif method_name in ["duration.greater", "duration.less", "rating.greater", "rating.less"]:
self.methods.append(("plex_search", [{method_name: util.check_number(method_data, method_name, minimum=0)}]))
elif method_name in ["year", "year.not"]:
self.methods.append(("plex_search", [[(method_name, util.get_year_list(data[m], method_name))]]))
elif method_name in ["decade", "decade.not"]:
self.methods.append(("plex_search", [[(method_name, util.get_int_list(data[m], util.remove_not(method_name)))]]))
self.methods.append(("plex_search", [{method_name: util.get_year_list(method_data, current_year, method_name)}]))
elif method_name in util.tmdb_searches:
final_values = []
for value in util.get_list(data[m]):
for value in util.get_list(method_data):
if value.lower() == "tmdb" and "tmdb_person" in self.details:
for name in self.details["tmdb_person"]:
final_values.append(name)
else:
final_values.append(value)
self.methods.append(("plex_search", [[(method_name, final_values)]]))
elif method_name == "title":
self.methods.append(("plex_search", [[(method_name, util.get_list(data[m], split=False))]]))
self.methods.append(("plex_search", [{method_name: self.library.validate_search_list(final_values, os.path.splitext(method_name)[0])}]))
elif method_name in util.plex_searches:
self.methods.append(("plex_search", [[(method_name, util.get_list(data[m]))]]))
if method_name in util.tmdb_searches:
final_values = []
for value in util.get_list(method_data):
if value.lower() == "tmdb" and "tmdb_person" in self.details:
for name in self.details["tmdb_person"]:
final_values.append(name)
else:
final_values.append(value)
else:
final_values = method_data
self.methods.append(("plex_search", [{method_name: self.library.validate_search_list(final_values, os.path.splitext(method_name)[0])}]))
elif method_name == "plex_all":
self.methods.append((method_name, [""]))
elif method_name == "plex_collection":
self.methods.append((method_name, self.library.validate_collections(data[m] if isinstance(data[m], list) else [data[m]])))
self.methods.append((method_name, self.library.validate_collections(method_data if isinstance(method_data, list) else [method_data])))
elif method_name == "anidb_popular":
list_count = util.regex_first_int(data[m], "List Size", default=40)
list_count = util.regex_first_int(method_data, "List Size", default=40)
if 1 <= list_count <= 30:
self.methods.append((method_name, [list_count]))
else:
logger.warning("Collection Error: anidb_popular must be an integer between 1 and 30 defaulting to 30")
self.methods.append((method_name, [30]))
elif method_name == "mal_id":
self.methods.append((method_name, util.get_int_list(data[m], "MyAnimeList ID")))
self.methods.append((method_name, util.get_int_list(method_data, "MyAnimeList ID")))
elif method_name in ["anidb_id", "anidb_relation"]:
self.methods.append((method_name, config.AniDB.validate_anidb_list(util.get_int_list(data[m], "AniDB ID"), self.library.Plex.language)))
self.methods.append((method_name, config.AniDB.validate_anidb_list(util.get_int_list(method_data, "AniDB ID"), self.library.Plex.language)))
elif method_name in ["anilist_id", "anilist_relations", "anilist_studio"]:
self.methods.append((method_name, config.AniList.validate_anilist_ids(util.get_int_list(data[m], "AniList ID"), studio=method_name == "anilist_studio")))
self.methods.append((method_name, config.AniList.validate_anilist_ids(util.get_int_list(method_data, "AniList ID"), studio=method_name == "anilist_studio")))
elif method_name == "trakt_list":
self.methods.append((method_name, config.Trakt.validate_trakt_list(util.get_list(data[m]))))
self.methods.append((method_name, config.Trakt.validate_trakt_list(util.get_list(method_data))))
elif method_name == "trakt_list_details":
valid_list = config.Trakt.validate_trakt_list(util.get_list(data[m]))
valid_list = config.Trakt.validate_trakt_list(util.get_list(method_data))
item = config.Trakt.standard_list(valid_list[0])
if hasattr(item, "description") and item.description:
self.summaries[method_name] = item.description
self.methods.append((method_name[:-8], valid_list))
elif method_name == "trakt_watchlist":
self.methods.append((method_name, config.Trakt.validate_trakt_watchlist(util.get_list(data[m]), self.library.is_movie)))
self.methods.append((method_name, config.Trakt.validate_trakt_watchlist(util.get_list(method_data), self.library.is_movie)))
elif method_name == "imdb_list":
new_list = []
for imdb_list in util.get_list(data[m], split=False):
for imdb_list in util.get_list(method_data, split=False):
if isinstance(imdb_list, dict):
if "url" in imdb_list and imdb_list["url"]: imdb_url = imdb_list["url"]
else: raise Failed("Collection Error: imdb_list attribute url is required")
list_count = util.regex_first_int(imdb_list["limit"], "List Limit", default=0) if "limit" in imdb_list and imdb_list["limit"] else 0
dict_methods = {dm.lower(): dm for dm in imdb_list}
if "url" in dict_methods and imdb_list[dict_methods["url"]]:
imdb_url = imdb_list[dict_methods["url"]]
else:
raise Failed("Collection Error: imdb_list attribute url is required")
list_count = util.regex_first_int(imdb_list[dict_methods["limit"]], "List Limit", default=0) if "limit" in dict_methods and imdb_list[dict_methods["limit"]] else 0
else:
imdb_url = str(imdb_list)
list_count = 0
new_list.append({"url": imdb_url, "limit": list_count})
self.methods.append((method_name, new_list))
elif method_name == "letterboxd_list":
self.methods.append((method_name, util.get_list(data[m], split=False)))
self.methods.append((method_name, util.get_list(method_data, split=False)))
elif method_name == "letterboxd_list_details":
values = util.get_list(data[m], split=False)
values = util.get_list(method_data, split=False)
self.summaries[method_name] = config.Letterboxd.get_list_description(values[0], self.library.Plex.language)
self.methods.append((method_name[:-8], values))
elif method_name in util.dictionary_lists:
if isinstance(data[m], dict):
def get_int(parent, method, data_in, default_in, minimum=1, maximum=None):
if method not in data_in: logger.warning(f"Collection Warning: {parent} {method} attribute not found using {default_in} as default")
elif not data_in[method]: logger.warning(f"Collection Warning: {parent} {method} attribute is blank using {default_in} as default")
elif isinstance(data_in[method], int) and data_in[method] >= minimum:
if maximum is None or data_in[method] <= maximum: return data_in[method]
else: logger.warning(f"Collection Warning: {parent} {method} attribute {data_in[method]} invalid must an integer <= {maximum} using {default_in} as default")
else: logger.warning(f"Collection Warning: {parent} {method} attribute {data_in[method]} invalid must an integer >= {minimum} using {default_in} as default")
if isinstance(method_data, dict):
def get_int(parent, method, data_in, methods_in, default_in, minimum=1, maximum=None):
if method not in methods_in:
logger.warning(f"Collection Warning: {parent} {methods_in[method]} attribute not found using {default_in} as default")
elif not data_in[methods_in[method]]:
logger.warning(f"Collection Warning: {parent} {methods_in[method]} attribute is blank using {default_in} as default")
elif isinstance(data_in[methods_in[method]], int) and data_in[methods_in[method]] >= minimum:
if maximum is None or data_in[methods_in[method]] <= maximum:
return data_in[methods_in[method]]
else:
logger.warning(f"Collection Warning: {parent} {methods_in[method]} attribute {data_in[methods_in[method]]} invalid must an integer <= {maximum} using {default_in} as default")
else:
logger.warning(f"Collection Warning: {parent} {methods_in[method]} attribute {data_in[methods_in[method]]} invalid must an integer >= {minimum} using {default_in} as default")
return default_in
if method_name == "filters":
for f in data[m]:
if f in util.method_alias or (f.endswith(".not") and f[:-4] in util.method_alias):
filter_method = (util.method_alias[f[:-4]] + f[-4:]) if f.endswith(".not") else util.method_alias[f]
logger.warning(f"Collection Warning: {f} filter will run as {filter_method}")
for filter_name, filter_data in method_data.items():
if filter_name.lower() in util.method_alias or (filter_name.lower().endswith(".not") and filter_name.lower()[:-4] in util.method_alias):
filter_method = (util.method_alias[filter_name.lower()[:-4]] + filter_name.lower()[-4:]) if filter_name.lower().endswith(".not") else util.method_alias[filter_name.lower()]
logger.warning(f"Collection Warning: {filter_name} filter will run as {filter_method}")
else:
filter_method = f
filter_method = filter_name.lower()
if filter_method in util.movie_only_filters and self.library.is_show:
raise Failed(f"Collection Error: {filter_method} filter only works for movie libraries")
elif data[m][f] is None:
elif filter_data is None:
raise Failed(f"Collection Error: {filter_method} filter is blank")
elif filter_method == "year":
filter_data = util.get_year_list(data[m][f], f"{filter_method} filter")
valid_data = util.get_year_list(filter_data, current_year, f"{filter_method} filter")
elif filter_method in ["max_age", "duration.gte", "duration.lte", "tmdb_vote_count.gte", "tmdb_vote_count.lte"]:
filter_data = util.check_number(data[m][f], f"{filter_method} filter", minimum=1)
valid_data = util.check_number(filter_data, f"{filter_method} filter", minimum=1)
elif filter_method in ["year.gte", "year.lte"]:
filter_data = util.check_number(data[m][f], f"{filter_method} filter", minimum=1800, maximum=current_year)
valid_data = util.check_year(filter_data, current_year, f"{filter_method} filter")
elif filter_method in ["rating.gte", "rating.lte"]:
filter_data = util.check_number(data[m][f], f"{filter_method} filter", number_type="float", minimum=0.1, maximum=10)
valid_data = util.check_number(filter_data, f"{filter_method} filter", number_type="float", minimum=0.1, maximum=10)
elif filter_method in ["originally_available.gte", "originally_available.lte"]:
filter_data = util.check_date(data[m][f], f"{filter_method} filter")
valid_data = util.check_date(filter_data, f"{filter_method} filter")
elif filter_method == "original_language":
filter_data = util.get_list(data[m][f], lower=True)
valid_data = util.get_list(filter_data, lower=True)
elif filter_method == "collection":
filter_data = data[m][f] if isinstance(data[m][f], list) else [data[m][f]]
valid_data = filter_data if isinstance(filter_data, list) else [filter_data]
elif filter_method in util.all_filters:
filter_data = util.get_list(data[m][f])
valid_data = util.get_list(filter_data)
else:
raise Failed(f"Collection Error: {filter_method} filter not supported")
self.filters.append((filter_method, filter_data))
self.filters.append((filter_method, valid_data))
elif method_name == "plex_collectionless":
new_dictionary = {}
dict_methods = {dm.lower(): dm for dm in method_data}
prefix_list = []
if "exclude_prefix" in data[m] and data[m]["exclude_prefix"]:
if isinstance(data[m]["exclude_prefix"], list): prefix_list.extend(data[m]["exclude_prefix"])
else: prefix_list.append(str(data[m]["exclude_prefix"]))
if "exclude_prefix" in dict_methods and method_data[dict_methods["exclude_prefix"]]:
if isinstance(method_data[dict_methods["exclude_prefix"]], list):
prefix_list.extend(method_data[dict_methods["exclude_prefix"]])
else:
prefix_list.append(str(method_data[dict_methods["exclude_prefix"]]))
exact_list = []
if "exclude" in data[m] and data[m]["exclude"]:
if isinstance(data[m]["exclude"], list): exact_list.extend(data[m]["exclude"])
else: exact_list.append(str(data[m]["exclude"]))
if len(prefix_list) == 0 and len(exact_list) == 0: raise Failed("Collection Error: you must have at least one exclusion")
if "exclude" in dict_methods and method_data[dict_methods["exclude"]]:
if isinstance(method_data[dict_methods["exclude"]], list):
exact_list.extend(method_data[dict_methods["exclude"]])
else:
exact_list.append(str(method_data[dict_methods["exclude"]]))
if len(prefix_list) == 0 and len(exact_list) == 0:
raise Failed("Collection Error: you must have at least one exclusion")
new_dictionary["exclude_prefix"] = prefix_list
new_dictionary["exclude"] = exact_list
self.methods.append((method_name, [new_dictionary]))
elif method_name == "plex_search":
searches = []
used = []
for s in data[m]:
if s in util.method_alias or (s.endswith(".not") and s[:-4] in util.method_alias):
search = (util.method_alias[s[:-4]] + s[-4:]) if s.endswith(".not") else util.method_alias[s]
logger.warning(f"Collection Warning: {s} plex search attribute will run as {search}")
else:
search = s
if search in util.movie_only_searches and self.library.is_show:
raise Failed(f"Collection Error: {search} plex search attribute only works for movie libraries")
elif util.remove_not(search) in used:
raise Failed(f"Collection Error: Only one instance of {search} can be used try using it as a filter instead")
elif search in ["year", "year.not"]:
years = util.get_year_list(data[m][s], search)
if len(years) > 0:
used.append(util.remove_not(search))
searches.append((search, util.get_int_list(data[m][s], util.remove_not(search))))
elif search == "title":
used.append(util.remove_not(search))
searches.append((search, util.get_list(data[m][s], split=False)))
elif search in util.plex_searches:
used.append(util.remove_not(search))
searches.append((search, util.get_list(data[m][s])))
else:
logger.error(f"Collection Error: {search} plex search attribute not supported")
searches = {}
for search_name, search_data in method_data:
search, modifier = os.path.splitext(str(search_name).lower())
if search in util.method_alias:
search = util.method_alias[search]
logger.warning(f"Collection Warning: {str(search_name).lower()} plex search attribute will run as {search}{modifier if modifier else ''}")
search_final = f"{search}{modifier}"
if search_final in util.movie_only_searches and self.library.is_show:
raise Failed(f"Collection Error: {search_final} plex search attribute only works for movie libraries")
elif search_data is None:
raise Failed(f"Collection Error: {search_final} plex search attribute is blank")
elif search == "sort_by":
if str(search_data).lower() in util.plex_sort:
searches[search] = str(search_data).lower()
else:
logger.warning(f"Collection Error: {search_data} is not a valid plex search sort defaulting to title.asc")
elif search == "limit":
if not search_data:
raise Failed(f"Collection Warning: plex search limit attribute is blank")
elif not isinstance(search_data, int) and search_data > 0:
raise Failed(f"Collection Warning: plex search limit attribute: {search_data} must be an integer greater then 0")
else:
searches[search] = search_data
elif search == "title" and modifier in ["", ".and", ".not", ".begins", ".ends"]:
searches[search_final] = util.get_list(search_data, split=False)
elif (search == "studio" and modifier in ["", ".and", ".not", ".begins", ".ends"]) \
or (search in ["actor", "audio_language", "collection", "content_rating", "country", "director", "genre", "label", "producer", "subtitle_language", "writer"] and modifier in ["", ".and", ".not"]) \
or (search == "resolution" and modifier in [""]):
if search_final in util.tmdb_searches:
final_values = []
for value in util.get_list(search_data):
if value.lower() == "tmdb" and "tmdb_person" in self.details:
for name in self.details["tmdb_person"]:
final_values.append(name)
else:
final_values.append(value)
else:
final_values = search_data
searches[search_final] = self.library.validate_search_list(final_values, search)
elif (search == "decade" and modifier in [""]) \
or (search == "year" and modifier in [".greater", ".less"]):
searches[search_final] = util.check_year(search_data, current_year, search_final)
elif search in ["added", "originally_available"] and modifier in [".before", ".after"]:
searches[search_final] = util.check_date(search_data, search_final, return_string=True, plex_date=True)
elif search in ["duration", "rating"] and modifier in [".greater", ".less"]:
searches[search_final] = util.check_number(search_data, search_final, minimum=0)
elif search == "year" and modifier in ["", ".not"]:
searches[search_final] = util.get_year_list(search_data, current_year, search_final)
elif (search in ["title", "studio"] and modifier not in ["", ".and", ".not", ".begins", ".ends"]) \
or (search in ["actor", "audio_language", "collection", "content_rating", "country", "director", "genre", "label", "producer", "subtitle_language", "writer"] and modifier not in ["", ".and", ".not"]) \
or (search in ["resolution", "decade"] and modifier not in [""]) \
or (search in ["added", "originally_available"] and modifier not in [".before", ".after"]) \
or (search in ["duration", "rating"] and modifier not in [".greater", ".less"]) \
or (search in ["year"] and modifier not in ["", ".not", ".greater", ".less"]):
raise Failed(f"Collection Error: modifier: {modifier} not supported with the {search} plex search attribute")
else:
raise Failed(f"Collection Error: {search_final} plex search attribute not supported")
self.methods.append((method_name, [searches]))
elif method_name == "tmdb_discover":
new_dictionary = {"limit": 100}
for attr in data[m]:
if data[m][attr]:
attr_data = data[m][attr]
if (self.library.is_movie and attr in util.discover_movie) or (self.library.is_show and attr in util.discover_tv):
if attr == "language":
if re.compile("([a-z]{2})-([A-Z]{2})").match(str(attr_data)):
new_dictionary[attr] = str(attr_data)
else:
raise Failed(f"Collection Error: {m} attribute {attr}: {attr_data} must match pattern ([a-z]{{2}})-([A-Z]{{2}}) e.g. en-US")
elif attr == "region":
if re.compile("^[A-Z]{2}$").match(str(attr_data)):
new_dictionary[attr] = str(attr_data)
else:
raise Failed(f"Collection Error: {m} attribute {attr}: {attr_data} must match pattern ^[A-Z]{{2}}$ e.g. US")
elif attr == "sort_by":
if (self.library.is_movie and attr_data in util.discover_movie_sort) or (self.library.is_show and attr_data in util.discover_tv_sort):
new_dictionary[attr] = attr_data
else:
raise Failed(f"Collection Error: {m} attribute {attr}: {attr_data} is invalid")
elif attr == "certification_country":
if "certification" in data[m] or "certification.lte" in data[m] or "certification.gte" in data[m]:
new_dictionary[attr] = attr_data
else:
raise Failed(f"Collection Error: {m} attribute {attr}: must be used with either certification, certification.lte, or certification.gte")
elif attr in ["certification", "certification.lte", "certification.gte"]:
if "certification_country" in data[m]:
new_dictionary[attr] = attr_data
else:
raise Failed(f"Collection Error: {m} attribute {attr}: must be used with certification_country")
elif attr in ["include_adult", "include_null_first_air_dates", "screened_theatrically"]:
if attr_data is True:
new_dictionary[attr] = attr_data
elif attr in ["primary_release_date.gte", "primary_release_date.lte", "release_date.gte", "release_date.lte", "air_date.gte", "air_date.lte", "first_air_date.gte", "first_air_date.lte"]:
new_dictionary[attr] = util.check_date(attr_data, f"{m} attribute {attr}", return_string=True)
elif attr in ["primary_release_year", "year", "first_air_date_year"]:
new_dictionary[attr] = util.check_number(attr_data, f"{m} attribute {attr}", minimum=1800, maximum=current_year + 1)
elif attr in ["vote_count.gte", "vote_count.lte", "vote_average.gte", "vote_average.lte", "with_runtime.gte", "with_runtime.lte"]:
new_dictionary[attr] = util.check_number(attr_data, f"{m} attribute {attr}", minimum=1)
elif attr in ["with_cast", "with_crew", "with_people", "with_companies", "with_networks", "with_genres", "without_genres", "with_keywords", "without_keywords", "with_original_language", "timezone"]:
new_dictionary[attr] = attr_data
else:
raise Failed(f"Collection Error: {m} attribute {attr} not supported")
elif attr == "limit":
if isinstance(attr_data, int) and attr_data > 0:
new_dictionary[attr] = attr_data
else:
raise Failed(f"Collection Error: {m} attribute {attr}: must be a valid number greater then 0")
else:
raise Failed(f"Collection Error: {m} attribute {attr} not supported")
else:
raise Failed(f"Collection Error: {m} parameter {attr} is blank")
for discover_name, discover_data in method_data:
discover_final = discover_name.lower()
if discover_data:
if (self.library.is_movie and discover_final in util.discover_movie) or (self.library.is_show and discover_final in util.discover_tv):
if discover_final == "language":
if re.compile("([a-z]{2})-([A-Z]{2})").match(str(discover_data)):
new_dictionary[discover_final] = str(discover_data)
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final}: {discover_data} must match pattern ([a-z]{{2}})-([A-Z]{{2}}) e.g. en-US")
elif discover_final == "region":
if re.compile("^[A-Z]{2}$").match(str(discover_data)):
new_dictionary[discover_final] = str(discover_data)
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final}: {discover_data} must match pattern ^[A-Z]{{2}}$ e.g. US")
elif discover_final == "sort_by":
if (self.library.is_movie and discover_data in util.discover_movie_sort) or (self.library.is_show and discover_data in util.discover_tv_sort):
new_dictionary[discover_final] = discover_data
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final}: {discover_data} is invalid")
elif discover_final == "certification_country":
if "certification" in method_data or "certification.lte" in method_data or "certification.gte" in method_data:
new_dictionary[discover_final] = discover_data
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final}: must be used with either certification, certification.lte, or certification.gte")
elif discover_final in ["certification", "certification.lte", "certification.gte"]:
if "certification_country" in method_data:
new_dictionary[discover_final] = discover_data
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final}: must be used with certification_country")
elif discover_final in ["include_adult", "include_null_first_air_dates", "screened_theatrically"]:
if discover_data is True:
new_dictionary[discover_final] = discover_data
elif discover_final in util.discover_dates:
new_dictionary[discover_final] = util.check_date(discover_data, f"{method_name} attribute {discover_final}", return_string=True)
elif discover_final in ["primary_release_year", "year", "first_air_date_year"]:
new_dictionary[discover_final] = util.check_number(discover_data, f"{method_name} attribute {discover_final}", minimum=1800, maximum=current_year + 1)
elif discover_final in ["vote_count.gte", "vote_count.lte", "vote_average.gte", "vote_average.lte", "with_runtime.gte", "with_runtime.lte"]:
new_dictionary[discover_final] = util.check_number(discover_data, f"{method_name} attribute {discover_final}", minimum=1)
elif discover_final in ["with_cast", "with_crew", "with_people", "with_companies", "with_networks", "with_genres", "without_genres", "with_keywords", "without_keywords", "with_original_language", "timezone"]:
new_dictionary[discover_final] = discover_data
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final} not supported")
elif discover_final == "limit":
if isinstance(discover_data, int) and discover_data > 0:
new_dictionary[discover_final] = discover_data
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final}: must be a valid number greater then 0")
else:
raise Failed(f"Collection Error: {method_name} attribute {discover_final} not supported")
else:
raise Failed(f"Collection Error: {method_name} parameter {discover_final} is blank")
if len(new_dictionary) > 1:
self.methods.append((method_name, [new_dictionary]))
else:
raise Failed(f"Collection Error: {m} had no valid fields")
raise Failed(f"Collection Error: {method_name} had no valid fields")
elif "tautulli" in method_name:
new_dictionary = {}
if method_name == "tautulli_popular": new_dictionary["list_type"] = "popular"
elif method_name == "tautulli_watched": new_dictionary["list_type"] = "watched"
else: raise Failed(f"Collection Error: {method_name} attribute not supported")
new_dictionary["list_days"] = get_int(method_name, "list_days", data[m], 30)
new_dictionary["list_size"] = get_int(method_name, "list_size", data[m], 10)
new_dictionary["list_buffer"] = get_int(method_name, "list_buffer", data[m], 20)
if method_name == "tautulli_popular":
new_dictionary["list_type"] = "popular"
elif method_name == "tautulli_watched":
new_dictionary["list_type"] = "watched"
else:
raise Failed(f"Collection Error: {method_name} attribute not supported")
dict_methods = {dm.lower(): dm for dm in method_data}
new_dictionary["list_days"] = get_int(method_name, "list_days", method_data, dict_methods, 30)
new_dictionary["list_size"] = get_int(method_name, "list_size", method_data, dict_methods, 10)
new_dictionary["list_buffer"] = get_int(method_name, "list_buffer", method_data, dict_methods, 20)
self.methods.append((method_name, [new_dictionary]))
elif method_name == "mal_season":
new_dictionary = {"sort_by": "anime_num_list_users"}
if "sort_by" not in data[m]: logger.warning("Collection Warning: mal_season sort_by attribute not found using members as default")
elif not data[m]["sort_by"]: logger.warning("Collection Warning: mal_season sort_by attribute is blank using members as default")
elif data[m]["sort_by"] not in util.mal_season_sort: logger.warning(f"Collection Warning: mal_season sort_by attribute {data[m]['sort_by']} invalid must be either 'members' or 'score' using members as default")
else: new_dictionary["sort_by"] = util.mal_season_sort[data[m]["sort_by"]]
dict_methods = {dm.lower(): dm for dm in method_data}
if "sort_by" not in dict_methods:
logger.warning("Collection Warning: mal_season sort_by attribute not found using members as default")
elif not method_data[dict_methods["sort_by"]]:
logger.warning("Collection Warning: mal_season sort_by attribute is blank using members as default")
elif method_data[dict_methods["sort_by"]] not in util.mal_season_sort:
logger.warning(f"Collection Warning: mal_season sort_by attribute {method_data[dict_methods['sort_by']]} invalid must be either 'members' or 'score' using members as default")
else:
new_dictionary["sort_by"] = util.mal_season_sort[method_data[dict_methods["sort_by"]]]
if current_time.month in [1, 2, 3]: new_dictionary["season"] = "winter"
elif current_time.month in [4, 5, 6]: new_dictionary["season"] = "spring"
elif current_time.month in [7, 8, 9]: new_dictionary["season"] = "summer"
elif current_time.month in [10, 11, 12]: new_dictionary["season"] = "fall"
if "season" not in data[m]: logger.warning(f"Collection Warning: mal_season season attribute not found using the current season: {new_dictionary['season']} as default")
elif not data[m]["season"]: logger.warning(f"Collection Warning: mal_season season attribute is blank using the current season: {new_dictionary['season']} as default")
elif data[m]["season"] not in util.pretty_seasons: logger.warning(f"Collection Warning: mal_season season attribute {data[m]['season']} invalid must be either 'winter', 'spring', 'summer' or 'fall' using the current season: {new_dictionary['season']} as default")
else: new_dictionary["season"] = data[m]["season"]
if "season" not in dict_methods:
logger.warning(f"Collection Warning: mal_season season attribute not found using the current season: {new_dictionary['season']} as default")
elif not method_data[dict_methods["season"]]:
logger.warning(f"Collection Warning: mal_season season attribute is blank using the current season: {new_dictionary['season']} as default")
elif method_data[dict_methods["season"]] not in util.pretty_seasons:
logger.warning(f"Collection Warning: mal_season season attribute {method_data[dict_methods['season']]} invalid must be either 'winter', 'spring', 'summer' or 'fall' using the current season: {new_dictionary['season']} as default")
else:
new_dictionary["season"] = method_data[dict_methods["season"]]
new_dictionary["year"] = get_int(method_name, "year", data[m], current_time.year, minimum=1917, maximum=current_time.year + 1)
new_dictionary["limit"] = get_int(method_name, "limit", data[m], 100, maximum=500)
new_dictionary["year"] = get_int(method_name, "year", method_data, dict_methods, current_time.year, minimum=1917, maximum=current_time.year + 1)
new_dictionary["limit"] = get_int(method_name, "limit", method_data, dict_methods, 100, maximum=500)
self.methods.append((method_name, [new_dictionary]))
elif method_name == "mal_userlist":
new_dictionary = {"status": "all", "sort_by": "list_score"}
if "username" not in data[m]: raise Failed("Collection Error: mal_userlist username attribute is required")
elif not data[m]["username"]: raise Failed("Collection Error: mal_userlist username attribute is blank")
else: new_dictionary["username"] = data[m]["username"]
dict_methods = {dm.lower(): dm for dm in method_data}
if "username" not in dict_methods:
raise Failed("Collection Error: mal_userlist username attribute is required")
elif not method_data[dict_methods["username"]]:
raise Failed("Collection Error: mal_userlist username attribute is blank")
else:
new_dictionary["username"] = method_data[dict_methods["username"]]
if "status" not in data[m]: logger.warning("Collection Warning: mal_season status attribute not found using all as default")
elif not data[m]["status"]: logger.warning("Collection Warning: mal_season status attribute is blank using all as default")
elif data[m]["status"] not in util.mal_userlist_status: logger.warning(f"Collection Warning: mal_season status attribute {data[m]['status']} invalid must be either 'all', 'watching', 'completed', 'on_hold', 'dropped' or 'plan_to_watch' using all as default")
else: new_dictionary["status"] = util.mal_userlist_status[data[m]["status"]]
if "status" not in dict_methods:
logger.warning("Collection Warning: mal_season status attribute not found using all as default")
elif not method_data[dict_methods["status"]]:
logger.warning("Collection Warning: mal_season status attribute is blank using all as default")
elif method_data[dict_methods["status"]] not in util.mal_userlist_status:
logger.warning(f"Collection Warning: mal_season status attribute {method_data[dict_methods['status']]} invalid must be either 'all', 'watching', 'completed', 'on_hold', 'dropped' or 'plan_to_watch' using all as default")
else:
new_dictionary["status"] = util.mal_userlist_status[method_data[dict_methods["status"]]]
if "sort_by" not in data[m]: logger.warning("Collection Warning: mal_season sort_by attribute not found using score as default")
elif not data[m]["sort_by"]: logger.warning("Collection Warning: mal_season sort_by attribute is blank using score as default")
elif data[m]["sort_by"] not in util.mal_userlist_sort: logger.warning(f"Collection Warning: mal_season sort_by attribute {data[m]['sort_by']} invalid must be either 'score', 'last_updated', 'title' or 'start_date' using score as default")
else: new_dictionary["sort_by"] = util.mal_userlist_sort[data[m]["sort_by"]]
if "sort_by" not in dict_methods:
logger.warning("Collection Warning: mal_season sort_by attribute not found using score as default")
elif not method_data[dict_methods["sort_by"]]:
logger.warning("Collection Warning: mal_season sort_by attribute is blank using score as default")
elif method_data[dict_methods["sort_by"]] not in util.mal_userlist_sort:
logger.warning(f"Collection Warning: mal_season sort_by attribute {method_data[dict_methods['sort_by']]} invalid must be either 'score', 'last_updated', 'title' or 'start_date' using score as default")
else:
new_dictionary["sort_by"] = util.mal_userlist_sort[method_data[dict_methods["sort_by"]]]
new_dictionary["limit"] = get_int(method_name, "limit", data[m], 100, maximum=1000)
new_dictionary["limit"] = get_int(method_name, "limit", method_data, dict_methods, 100, maximum=1000)
self.methods.append((method_name, [new_dictionary]))
elif method_name == "anilist_season":
elif "anilist" in method_name:
new_dictionary = {"sort_by": "score"}
if "sort_by" not in data[m]: logger.warning("Collection Warning: anilist_season sort_by attribute not found using score as default")
elif not data[m]["sort_by"]: logger.warning("Collection Warning: anilist_season sort_by attribute is blank using score as default")
elif data[m]["sort_by"] not in ["score", "popular"]: logger.warning(f"Collection Warning: anilist_season sort_by attribute {data[m]['sort_by']} invalid must be either 'score' or 'popular' using score as default")
else: new_dictionary["sort_by"] = data[m]["sort_by"]
dict_methods = {dm.lower(): dm for dm in method_data}
if method_name == "anilist_season":
if current_time.month in [12, 1, 2]: new_dictionary["season"] = "winter"
elif current_time.month in [3, 4, 5]: new_dictionary["season"] = "spring"
elif current_time.month in [6, 7, 8]: new_dictionary["season"] = "summer"
elif current_time.month in [9, 10, 11]: new_dictionary["season"] = "fall"
if "season" not in data[m]: logger.warning(f"Collection Warning: anilist_season season attribute not found using the current season: {new_dictionary['season']} as default")
elif not data[m]["season"]: logger.warning(f"Collection Warning: anilist_season season attribute is blank using the current season: {new_dictionary['season']} as default")
elif data[m]["season"] not in util.pretty_seasons: logger.warning(f"Collection Warning: anilist_season season attribute {data[m]['season']} invalid must be either 'winter', 'spring', 'summer' or 'fall' using the current season: {new_dictionary['season']} as default")
else: new_dictionary["season"] = data[m]["season"]
if "season" not in dict_methods:
logger.warning(f"Collection Warning: anilist_season season attribute not found using the current season: {new_dictionary['season']} as default")
elif not method_data[dict_methods["season"]]:
logger.warning(f"Collection Warning: anilist_season season attribute is blank using the current season: {new_dictionary['season']} as default")
elif method_data[dict_methods["season"]] not in util.pretty_seasons:
logger.warning(f"Collection Warning: anilist_season season attribute {method_data[dict_methods['season']]} invalid must be either 'winter', 'spring', 'summer' or 'fall' using the current season: {new_dictionary['season']} as default")
else:
new_dictionary["season"] = method_data[dict_methods["season"]]
new_dictionary["year"] = get_int(method_name, "year", method_data, dict_methods, current_time.year, minimum=1917, maximum=current_time.year + 1)
elif method_name == "anilist_genre":
if "genre" not in dict_methods:
raise Failed(f"Collection Warning: anilist_genre genre attribute not found")
elif not method_data[dict_methods["genre"]]:
raise Failed(f"Collection Warning: anilist_genre genre attribute is blank")
else:
new_dictionary["genre"] = self.config.AniList.validate_genre(method_data[dict_methods["genre"]])
elif method_name == "anilist_tag":
if "tag" not in dict_methods:
raise Failed(f"Collection Warning: anilist_tag tag attribute not found")
elif not method_data[dict_methods["tag"]]:
raise Failed(f"Collection Warning: anilist_tag tag attribute is blank")
else:
new_dictionary["tag"] = self.config.AniList.validate_tag(method_data[dict_methods["tag"]])
if "sort_by" not in dict_methods:
logger.warning(f"Collection Warning: {method_name} sort_by attribute not found using score as default")
elif not method_data[dict_methods["sort_by"]]:
logger.warning(f"Collection Warning: {method_name} sort_by attribute is blank using score as default")
elif str(method_data[dict_methods["sort_by"]]).lower() not in ["score", "popular"]:
logger.warning(f"Collection Warning: {method_name} sort_by attribute {method_data[dict_methods['sort_by']]} invalid must be either 'score' or 'popular' using score as default")
else:
new_dictionary["sort_by"] = method_data[dict_methods["sort_by"]]
new_dictionary["limit"] = get_int(method_name, "limit", method_data, dict_methods, 0, maximum=500)
new_dictionary["year"] = get_int(method_name, "year", data[m], current_time.year, minimum=1917, maximum=current_time.year + 1)
new_dictionary["limit"] = get_int(method_name, "limit", data[m], 0, maximum=500)
self.methods.append((method_name, [new_dictionary]))
else:
raise Failed(f"Collection Error: {m} attribute is not a dictionary: {data[m]}")
raise Failed(f"Collection Error: {method_name} attribute is not a dictionary: {method_data}")
elif method_name in util.count_lists:
list_count = util.regex_first_int(data[m], "List Size", default=10)
list_count = util.regex_first_int(method_data, "List Size", default=10)
if list_count < 1:
logger.warning(f"Collection Warning: {method_name} must be an integer greater then 0 defaulting to 10")
list_count = 10
self.methods.append((method_name, [list_count]))
elif "tvdb" in method_name:
values = util.get_list(data[m])
values = util.get_list(method_data)
if method_name[-8:] == "_details":
if method_name == "tvdb_movie_details":
item = config.TVDb.get_movie(self.library.Plex.language, values[0])
@ -594,7 +716,7 @@ class CollectionBuilder:
else:
self.methods.append((method_name, values))
elif method_name in util.tmdb_lists:
values = config.TMDb.validate_tmdb_list(util.get_int_list(data[m], f"TMDb {util.tmdb_type[method_name]} ID"), util.tmdb_type[method_name])
values = config.TMDb.validate_tmdb_list(util.get_int_list(method_data, f"TMDb {util.tmdb_type[method_name]} ID"), util.tmdb_type[method_name])
if method_name[-8:] == "_details":
if method_name in ["tmdb_collection_details", "tmdb_movie_details", "tmdb_show_details"]:
item = config.TMDb.get_movie_show_or_collection(values[0], self.library.is_movie)
@ -618,19 +740,22 @@ class CollectionBuilder:
else:
self.methods.append((method_name, values))
elif method_name in util.all_lists:
self.methods.append((method_name, util.get_list(data[m])))
self.methods.append((method_name, util.get_list(method_data)))
elif method_name not in util.other_attributes:
raise Failed(f"Collection Error: {method_name} attribute not supported")
elif m in util.all_lists or m in util.method_alias or m in util.plex_searches:
raise Failed(f"Collection Error: {m} attribute is blank")
elif method_name in util.all_lists or method_name in util.method_alias or method_name in util.plex_searches:
raise Failed(f"Collection Error: {method_name} attribute is blank")
else:
logger.warning(f"Collection Warning: {m} attribute is blank")
logger.warning(f"Collection Warning: {method_name} attribute is blank")
self.sync = self.library.sync_mode == "sync"
if "sync_mode" in data:
if not data["sync_mode"]: logger.warning(f"Collection Warning: sync_mode attribute is blank using general: {self.library.sync_mode}")
elif data["sync_mode"] not in ["append", "sync"]: logger.warning(f"Collection Warning: {self.library.sync_mode} sync_mode invalid using general: {data['sync_mode']}")
else: self.sync = data["sync_mode"] == "sync"
if "sync_mode" in methods:
if not self.data[methods["sync_mode"]]:
logger.warning(f"Collection Warning: sync_mode attribute is blank using general: {self.library.sync_mode}")
elif self.data[methods["sync_mode"]].lower() not in ["append", "sync"]:
logger.warning(f"Collection Warning: {self.data[methods['sync_mode']]} sync_mode invalid using general: {self.library.sync_mode}")
else:
self.sync = self.data[methods["sync_mode"]].lower() == "sync"
self.do_arr = False
if self.library.Radarr:
@ -679,38 +804,31 @@ class CollectionBuilder:
items_found += len(items)
elif method == "plex_search":
search_terms = {}
title_searches = None
has_processed = False
for search_method, search_list in value:
if search_method == "title":
ors = ""
for o, param in enumerate(search_list):
ors += f"{' OR ' if o > 0 else ''}{param}"
title_searches = search_list
logger.info(f"Processing {pretty}: title({ors})")
has_processed = True
break
for search_method, search_list in value:
if search_method != "title":
final_method = search_method[:-4] + "!" if search_method[-4:] == ".not" else search_method
if self.library.is_show:
final_method = "show." + final_method
search_terms[final_method] = search_list
search_limit = None
search_sort = None
for search_method, search_data in value:
if search_method == "limit":
search_limit = search_data
elif search_method == "sort_by":
search_sort = util.plex_sort[search_data]
else:
search, modifier = os.path.splitext(str(search_method).lower())
final_search = util.search_alias[search] if search in util.search_alias else search
final_mod = util.plex_modifiers[modifier] if modifier in util.plex_modifiers else ""
final_method = f"{final_search}{final_mod}"
search_terms[final_method] = search_data * 60000 if final_search == "duration" else search_data
ors = ""
for o, param in enumerate(search_list):
or_des = " OR " if o > 0 else f"{search_method}("
conjunction = " AND " if final_mod == "&" else " OR "
for o, param in enumerate(search_data):
or_des = conjunction if o > 0 else f"{search_method}("
ors += f"{or_des}{param}"
if has_processed:
logger.info(f"\t\t AND {ors})")
else:
logger.info(f"Processing {pretty}: {ors})")
has_processed = True
if title_searches:
items = []
for title_search in title_searches:
items.extend(self.library.Plex.search(title_search, **search_terms))
else:
items = self.library.Plex.search(**search_terms)
items = self.library.Plex.search(sort=search_sort, maxresults=search_limit, **search_terms)
items_found += len(items)
elif method == "plex_collectionless":
good_collections = []
@ -883,7 +1001,7 @@ class CollectionBuilder:
if "label" in self.details:
item_labels = [label.tag for label in collection.labels]
labels = util.get_list(self.details["label"])
if "label_sync_mode" in self.details and self.details["label_sync_mode"] == "sync":
if "label_sync_mode" in self.details and str(self.details["label_sync_mode"]).lower() == "sync":
for label in (la for la in item_labels if la not in labels):
collection.removeLabel(label)
logger.info(f"Detail: Label {label} removed")
@ -942,7 +1060,7 @@ class CollectionBuilder:
logger.warning(f"No Folder: {os.path.join(path, folder)}")
def set_image(image_method, images, is_background=False):
if image_method in ['file_poster', 'asset_directory']:
if image_method in ["file_poster", "file_background", "asset_directory"]:
if is_background: collection.uploadArt(filepath=images[image_method])
else: collection.uploadPoster(filepath=images[image_method])
image_location = "File"

@ -106,8 +106,8 @@ class Config:
if isinstance(data[attribute], bool): return data[attribute]
else: message = f"{text} must be either true or false"
elif var_type == "int":
if isinstance(data[attribute], int) and data[attribute] > 0: return data[attribute]
else: message = f"{text} must an integer > 0"
if isinstance(data[attribute], int) and data[attribute] >= 0: return data[attribute]
else: message = f"{text} must an integer >= 0"
elif var_type == "path":
if os.path.exists(os.path.abspath(data[attribute])): return data[attribute]
else: message = f"Path {os.path.abspath(data[attribute])} does not exist"
@ -267,47 +267,47 @@ class Config:
self.libraries = []
try: libs = check_for_attribute(self.data, "libraries", throw=True)
except Failed as e: raise Failed(e)
for lib in libs:
for library_name, lib in libs.items():
util.separator()
params = {}
if "library_name" in libs[lib] and libs[lib]["library_name"]:
params["name"] = str(libs[lib]["library_name"])
logger.info(f"Connecting to {params['name']} ({lib}) Library...")
if "library_name" in lib and lib["library_name"]:
params["name"] = str(lib["library_name"])
logger.info(f"Connecting to {params['name']} ({library_name}) Library...")
else:
params["name"] = str(lib)
params["name"] = str(library_name)
logger.info(f"Connecting to {params['name']} Library...")
params["asset_directory"] = check_for_attribute(libs[lib], "asset_directory", parent="settings", var_type="list_path", default=self.general["asset_directory"], default_is_none=True, save=False)
params["asset_directory"] = check_for_attribute(lib, "asset_directory", parent="settings", var_type="list_path", default=self.general["asset_directory"], default_is_none=True, save=False)
if params["asset_directory"] is None:
logger.warning("Config Warning: Assets will not be used asset_directory attribute must be set under config or under this specific Library")
if "settings" in libs[lib] and libs[lib]["settings"] and "sync_mode" in libs[lib]["settings"]:
params["sync_mode"] = check_for_attribute(libs[lib], "sync_mode", parent="settings", test_list=["append", "sync"], options=" append (Only Add Items to the Collection)\n sync (Add & Remove Items from the Collection)", default=self.general["sync_mode"], do_print=False, save=False)
if "settings" in lib and lib["settings"] and "sync_mode" in lib["settings"]:
params["sync_mode"] = check_for_attribute(lib, "sync_mode", parent="settings", test_list=["append", "sync"], options=" append (Only Add Items to the Collection)\n sync (Add & Remove Items from the Collection)", default=self.general["sync_mode"], do_print=False, save=False)
else:
params["sync_mode"] = check_for_attribute(libs[lib], "sync_mode", test_list=["append", "sync"], options=" append (Only Add Items to the Collection)\n sync (Add & Remove Items from the Collection)", default=self.general["sync_mode"], do_print=False, save=False)
params["sync_mode"] = check_for_attribute(lib, "sync_mode", test_list=["append", "sync"], options=" append (Only Add Items to the Collection)\n sync (Add & Remove Items from the Collection)", default=self.general["sync_mode"], do_print=False, save=False)
if "settings" in libs[lib] and libs[lib]["settings"] and "show_unmanaged" in libs[lib]["settings"]:
params["show_unmanaged"] = check_for_attribute(libs[lib], "show_unmanaged", parent="settings", var_type="bool", default=self.general["show_unmanaged"], do_print=False, save=False)
if "settings" in lib and lib["settings"] and "show_unmanaged" in lib["settings"]:
params["show_unmanaged"] = check_for_attribute(lib, "show_unmanaged", parent="settings", var_type="bool", default=self.general["show_unmanaged"], do_print=False, save=False)
else:
params["show_unmanaged"] = check_for_attribute(libs[lib], "show_unmanaged", var_type="bool", default=self.general["show_unmanaged"], do_print=False, save=False)
params["show_unmanaged"] = check_for_attribute(lib, "show_unmanaged", var_type="bool", default=self.general["show_unmanaged"], do_print=False, save=False)
if "settings" in libs[lib] and libs[lib]["settings"] and "show_filtered" in libs[lib]["settings"]:
params["show_filtered"] = check_for_attribute(libs[lib], "show_filtered", parent="settings", var_type="bool", default=self.general["show_filtered"], do_print=False, save=False)
if "settings" in lib and lib["settings"] and "show_filtered" in lib["settings"]:
params["show_filtered"] = check_for_attribute(lib, "show_filtered", parent="settings", var_type="bool", default=self.general["show_filtered"], do_print=False, save=False)
else:
params["show_filtered"] = check_for_attribute(libs[lib], "show_filtered", var_type="bool", default=self.general["show_filtered"], do_print=False, save=False)
params["show_filtered"] = check_for_attribute(lib, "show_filtered", var_type="bool", default=self.general["show_filtered"], do_print=False, save=False)
if "settings" in libs[lib] and libs[lib]["settings"] and "show_missing" in libs[lib]["settings"]:
params["show_missing"] = check_for_attribute(libs[lib], "show_missing", parent="settings", var_type="bool", default=self.general["show_missing"], do_print=False, save=False)
if "settings" in lib and lib["settings"] and "show_missing" in lib["settings"]:
params["show_missing"] = check_for_attribute(lib, "show_missing", parent="settings", var_type="bool", default=self.general["show_missing"], do_print=False, save=False)
else:
params["show_missing"] = check_for_attribute(libs[lib], "show_missing", var_type="bool", default=self.general["show_missing"], do_print=False, save=False)
params["show_missing"] = check_for_attribute(lib, "show_missing", var_type="bool", default=self.general["show_missing"], do_print=False, save=False)
if "settings" in libs[lib] and libs[lib]["settings"] and "save_missing" in libs[lib]["settings"]:
params["save_missing"] = check_for_attribute(libs[lib], "save_missing", parent="settings", var_type="bool", default=self.general["save_missing"], do_print=False, save=False)
if "settings" in lib and lib["settings"] and "save_missing" in lib["settings"]:
params["save_missing"] = check_for_attribute(lib, "save_missing", parent="settings", var_type="bool", default=self.general["save_missing"], do_print=False, save=False)
else:
params["save_missing"] = check_for_attribute(libs[lib], "save_missing", var_type="bool", default=self.general["save_missing"], do_print=False, save=False)
params["save_missing"] = check_for_attribute(lib, "save_missing", var_type="bool", default=self.general["save_missing"], do_print=False, save=False)
if "mass_genre_update" in libs[lib] and libs[lib]["mass_genre_update"]:
params["mass_genre_update"] = check_for_attribute(libs[lib], "mass_genre_update", test_list=["tmdb", "omdb"], options=" tmdb (Use TMDb Metadata)\n omdb (Use IMDb Metadata through OMDb)", default_is_none=True, save=False)
if "mass_genre_update" in lib and lib["mass_genre_update"]:
params["mass_genre_update"] = check_for_attribute(lib, "mass_genre_update", test_list=["tmdb", "omdb"], options=" tmdb (Use TMDb Metadata)\n omdb (Use IMDb Metadata through OMDb)", default_is_none=True, save=False)
else:
params["mass_genre_update"] = None
@ -316,12 +316,12 @@ class Config:
logger.error("Config Error: mass_genre_update cannot be omdb without a successful OMDb Connection")
try:
params["metadata_path"] = check_for_attribute(libs[lib], "metadata_path", var_type="path", default=os.path.join(default_dir, f"{lib}.yml"), throw=True)
params["library_type"] = check_for_attribute(libs[lib], "library_type", test_list=["movie", "show"], options=" movie (For Movie Libraries)\n show (For Show Libraries)", throw=True)
params["metadata_path"] = check_for_attribute(lib, "metadata_path", var_type="path", default=os.path.join(default_dir, f"{library_name}.yml"), throw=True)
params["library_type"] = check_for_attribute(lib, "library_type", test_list=["movie", "show"], options=" movie (For Movie Libraries)\n show (For Show Libraries)", throw=True)
params["plex"] = {}
params["plex"]["url"] = check_for_attribute(libs[lib], "url", parent="plex", default=self.general["plex"]["url"], req_default=True, save=False)
params["plex"]["token"] = check_for_attribute(libs[lib], "token", parent="plex", default=self.general["plex"]["token"], req_default=True, save=False)
params["plex"]["timeout"] = check_for_attribute(libs[lib], "timeout", parent="plex", var_type="int", default=self.general["plex"]["timeout"], save=False)
params["plex"]["url"] = check_for_attribute(lib, "url", parent="plex", default=self.general["plex"]["url"], req_default=True, save=False)
params["plex"]["token"] = check_for_attribute(lib, "token", parent="plex", default=self.general["plex"]["token"], req_default=True, save=False)
params["plex"]["timeout"] = check_for_attribute(lib, "timeout", parent="plex", var_type="int", default=self.general["plex"]["timeout"], save=False)
library = PlexAPI(params, self.TMDb, self.TVDb)
logger.info(f"{params['name']} Library Connection Successful")
except Failed as e:
@ -329,47 +329,47 @@ class Config:
logger.info(f"{params['name']} Library Connection Failed")
continue
if self.general["radarr"]["url"] or "radarr" in libs[lib]:
if self.general["radarr"]["url"] or "radarr" in lib:
logger.info(f"Connecting to {params['name']} library's Radarr...")
radarr_params = {}
try:
radarr_params["url"] = check_for_attribute(libs[lib], "url", parent="radarr", default=self.general["radarr"]["url"], req_default=True, save=False)
radarr_params["token"] = check_for_attribute(libs[lib], "token", parent="radarr", default=self.general["radarr"]["token"], req_default=True, save=False)
radarr_params["version"] = check_for_attribute(libs[lib], "version", parent="radarr", test_list=["v2", "v3"], options=" v2 (For Radarr 0.2)\n v3 (For Radarr 3.0)", default=self.general["radarr"]["version"], save=False)
radarr_params["quality_profile"] = check_for_attribute(libs[lib], "quality_profile", parent="radarr", default=self.general["radarr"]["quality_profile"], req_default=True, save=False)
radarr_params["root_folder_path"] = check_for_attribute(libs[lib], "root_folder_path", parent="radarr", default=self.general["radarr"]["root_folder_path"], req_default=True, save=False)
radarr_params["add"] = check_for_attribute(libs[lib], "add", parent="radarr", var_type="bool", default=self.general["radarr"]["add"], save=False)
radarr_params["search"] = check_for_attribute(libs[lib], "search", parent="radarr", var_type="bool", default=self.general["radarr"]["search"], save=False)
radarr_params["tag"] = check_for_attribute(libs[lib], "search", parent="radarr", var_type="lower_list", default=self.general["radarr"]["tag"], default_is_none=True, save=False)
radarr_params["url"] = check_for_attribute(lib, "url", parent="radarr", default=self.general["radarr"]["url"], req_default=True, save=False)
radarr_params["token"] = check_for_attribute(lib, "token", parent="radarr", default=self.general["radarr"]["token"], req_default=True, save=False)
radarr_params["version"] = check_for_attribute(lib, "version", parent="radarr", test_list=["v2", "v3"], options=" v2 (For Radarr 0.2)\n v3 (For Radarr 3.0)", default=self.general["radarr"]["version"], save=False)
radarr_params["quality_profile"] = check_for_attribute(lib, "quality_profile", parent="radarr", default=self.general["radarr"]["quality_profile"], req_default=True, save=False)
radarr_params["root_folder_path"] = check_for_attribute(lib, "root_folder_path", parent="radarr", default=self.general["radarr"]["root_folder_path"], req_default=True, save=False)
radarr_params["add"] = check_for_attribute(lib, "add", parent="radarr", var_type="bool", default=self.general["radarr"]["add"], save=False)
radarr_params["search"] = check_for_attribute(lib, "search", parent="radarr", var_type="bool", default=self.general["radarr"]["search"], save=False)
radarr_params["tag"] = check_for_attribute(lib, "search", parent="radarr", var_type="lower_list", default=self.general["radarr"]["tag"], default_is_none=True, save=False)
library.Radarr = RadarrAPI(self.TMDb, radarr_params)
except Failed as e:
util.print_multiline(e)
logger.info(f"{params['name']} library's Radarr Connection {'Failed' if library.Radarr is None else 'Successful'}")
if self.general["sonarr"]["url"] or "sonarr" in libs[lib]:
if self.general["sonarr"]["url"] or "sonarr" in lib:
logger.info(f"Connecting to {params['name']} library's Sonarr...")
sonarr_params = {}
try:
sonarr_params["url"] = check_for_attribute(libs[lib], "url", parent="sonarr", default=self.general["sonarr"]["url"], req_default=True, save=False)
sonarr_params["token"] = check_for_attribute(libs[lib], "token", parent="sonarr", default=self.general["sonarr"]["token"], req_default=True, save=False)
sonarr_params["version"] = check_for_attribute(libs[lib], "version", parent="sonarr", test_list=["v2", "v3"], options=" v2 (For Sonarr 0.2)\n v3 (For Sonarr 3.0)", default=self.general["sonarr"]["version"], save=False)
sonarr_params["quality_profile"] = check_for_attribute(libs[lib], "quality_profile", parent="sonarr", default=self.general["sonarr"]["quality_profile"], req_default=True, save=False)
sonarr_params["root_folder_path"] = check_for_attribute(libs[lib], "root_folder_path", parent="sonarr", default=self.general["sonarr"]["root_folder_path"], req_default=True, save=False)
sonarr_params["add"] = check_for_attribute(libs[lib], "add", parent="sonarr", var_type="bool", default=self.general["sonarr"]["add"], save=False)
sonarr_params["search"] = check_for_attribute(libs[lib], "search", parent="sonarr", var_type="bool", default=self.general["sonarr"]["search"], save=False)
sonarr_params["season_folder"] = check_for_attribute(libs[lib], "season_folder", parent="sonarr", var_type="bool", default=self.general["sonarr"]["season_folder"], save=False)
sonarr_params["tag"] = check_for_attribute(libs[lib], "search", parent="sonarr", var_type="lower_list", default=self.general["sonarr"]["tag"], default_is_none=True, save=False)
sonarr_params["url"] = check_for_attribute(lib, "url", parent="sonarr", default=self.general["sonarr"]["url"], req_default=True, save=False)
sonarr_params["token"] = check_for_attribute(lib, "token", parent="sonarr", default=self.general["sonarr"]["token"], req_default=True, save=False)
sonarr_params["version"] = check_for_attribute(lib, "version", parent="sonarr", test_list=["v2", "v3"], options=" v2 (For Sonarr 0.2)\n v3 (For Sonarr 3.0)", default=self.general["sonarr"]["version"], save=False)
sonarr_params["quality_profile"] = check_for_attribute(lib, "quality_profile", parent="sonarr", default=self.general["sonarr"]["quality_profile"], req_default=True, save=False)
sonarr_params["root_folder_path"] = check_for_attribute(lib, "root_folder_path", parent="sonarr", default=self.general["sonarr"]["root_folder_path"], req_default=True, save=False)
sonarr_params["add"] = check_for_attribute(lib, "add", parent="sonarr", var_type="bool", default=self.general["sonarr"]["add"], save=False)
sonarr_params["search"] = check_for_attribute(lib, "search", parent="sonarr", var_type="bool", default=self.general["sonarr"]["search"], save=False)
sonarr_params["season_folder"] = check_for_attribute(lib, "season_folder", parent="sonarr", var_type="bool", default=self.general["sonarr"]["season_folder"], save=False)
sonarr_params["tag"] = check_for_attribute(lib, "search", parent="sonarr", var_type="lower_list", default=self.general["sonarr"]["tag"], default_is_none=True, save=False)
library.Sonarr = SonarrAPI(self.TVDb, sonarr_params, library.Plex.language)
except Failed as e:
util.print_multiline(e)
logger.info(f"{params['name']} library's Sonarr Connection {'Failed' if library.Sonarr is None else 'Successful'}")
if self.general["tautulli"]["url"] or "tautulli" in libs[lib]:
if self.general["tautulli"]["url"] or "tautulli" in lib:
logger.info(f"Connecting to {params['name']} library's Tautulli...")
tautulli_params = {}
try:
tautulli_params["url"] = check_for_attribute(libs[lib], "url", parent="tautulli", default=self.general["tautulli"]["url"], req_default=True, save=False)
tautulli_params["apikey"] = check_for_attribute(libs[lib], "apikey", parent="tautulli", default=self.general["tautulli"]["apikey"], req_default=True, save=False)
tautulli_params["url"] = check_for_attribute(lib, "url", parent="tautulli", default=self.general["tautulli"]["url"], req_default=True, save=False)
tautulli_params["apikey"] = check_for_attribute(lib, "apikey", parent="tautulli", default=self.general["tautulli"]["apikey"], req_default=True, save=False)
library.Tautulli = TautulliAPI(tautulli_params)
except Failed as e:
util.print_multiline(e)
@ -404,11 +404,11 @@ class Config:
util.separator(f"{library.name} Library {'Test ' if test else ''}Collections")
collections = {c: library.collections[c] for c in util.get_list(requested_collections) if c in library.collections} if requested_collections else library.collections
if collections:
for c in collections:
if test and ("test" not in collections[c] or collections[c]["test"] is not True):
for mapping_name, collection_attrs in collections.items():
if test and ("test" not in collection_attrs or collection_attrs["test"] is not True):
no_template_test = True
if "template" in collections[c] and collections[c]["template"]:
for data_template in util.get_list(collections[c]["template"], split=False):
if "template" in collection_attrs and collection_attrs["template"]:
for data_template in util.get_list(collection_attrs["template"], split=False):
if "name" in data_template \
and data_template["name"] \
and library.templates \
@ -421,13 +421,14 @@ class Config:
continue
try:
logger.info("")
util.separator(f"{c} Collection")
util.separator(f"{mapping_name} Collection")
logger.info("")
rating_key_map = {}
try:
builder = CollectionBuilder(self, library, c, collections[c])
builder = CollectionBuilder(self, library, mapping_name, collection_attrs)
except Failed as ef:
util.print_stacktrace()
util.print_multiline(ef, error=True)
continue
except Exception as ee:
@ -436,11 +437,11 @@ class Config:
continue
try:
collection_obj = library.get_collection(c)
collection_obj = library.get_collection(mapping_name)
collection_name = collection_obj.title
except Failed:
collection_obj = None
collection_name = c
collection_name = mapping_name
if len(builder.schedule) > 0:
util.print_multiline(builder.schedule, info=True)

@ -14,25 +14,34 @@ logger = logging.getLogger("Plex Meta Manager")
class PlexAPI:
def __init__(self, params, TMDb, TVDb):
try: self.PlexServer = PlexServer(params["plex"]["url"], params["plex"]["token"], timeout=params["plex"]["timeout"])
except Unauthorized: raise Failed("Plex Error: Plex token is invalid")
except ValueError as e: raise Failed(f"Plex Error: {e}")
try:
self.PlexServer = PlexServer(params["plex"]["url"], params["plex"]["token"], timeout=params["plex"]["timeout"])
except Unauthorized:
raise Failed("Plex Error: Plex token is invalid")
except ValueError as e:
raise Failed(f"Plex Error: {e}")
except requests.exceptions.ConnectionError:
util.print_stacktrace()
raise Failed("Plex Error: Plex url is invalid")
self.is_movie = params["library_type"] == "movie"
self.is_show = params["library_type"] == "show"
self.Plex = next((s for s in self.PlexServer.library.sections() if s.title == params["name"] and ((self.is_movie and isinstance(s, MovieSection)) or (self.is_show and isinstance(s, ShowSection)))), None)
if not self.Plex: raise Failed(f"Plex Error: Plex Library {params['name']} not found")
try: self.data, ind, bsi = yaml.util.load_yaml_guess_indent(open(params["metadata_path"], encoding="utf-8"))
except yaml.scanner.ScannerError as e: raise Failed(f"YAML Error: {util.tab_new_lines(e)}")
if not self.Plex:
raise Failed(f"Plex Error: Plex Library {params['name']} not found")
try:
self.data, ind, bsi = yaml.util.load_yaml_guess_indent(open(params["metadata_path"], encoding="utf-8"))
except yaml.scanner.ScannerError as e:
raise Failed(f"YAML Error: {util.tab_new_lines(e)}")
def get_dict(attribute):
if attribute in self.data:
if self.data[attribute]:
if isinstance(self.data[attribute], dict): return self.data[attribute]
else: logger.warning(f"Config Warning: {attribute} must be a dictionary")
else: logger.warning(f"Config Warning: {attribute} attribute is blank")
if isinstance(self.data[attribute], dict):
return self.data[attribute]
else:
logger.warning(f"Config Warning: {attribute} must be a dictionary")
else:
logger.warning(f"Config Warning: {attribute} attribute is blank")
return None
self.metadata = get_dict("metadata")
@ -81,6 +90,21 @@ class PlexAPI:
def server_search(self, data):
return self.PlexServer.search(data)
def get_search_choices(self, search_name, key=False):
if key: return {c.key.lower(): c.key for c in self.Plex.listFilterChoices(search_name)}
else: return {c.title.lower(): c.title for c in self.Plex.listFilterChoices(search_name)}
def validate_search_list(self, data, search_name):
final_search = util.search_alias[search_name] if search_name in util.search_alias else search_name
search_choices = self.get_search_choices(final_search, key=final_search.endswith("Language"))
valid_list = []
for value in util.get_list(data):
if str(value).lower in search_choices:
valid_list.append(search_choices[str(value).lower])
else:
raise Failed(f"Plex Error: No {search_name}: {value} found")
return valid_list
def get_all_collections(self):
return self.Plex.search(libtype="collection")
@ -133,11 +157,7 @@ class PlexAPI:
for filter_method, filter_data in filters:
modifier = filter_method[-4:]
method = filter_method[:-4] if modifier in [".not", ".lte", ".gte"] else filter_method
if method in util.method_alias:
method_name = util.method_alias[method]
logger.warning(f"Collection Warning: {method} attribute will run as {method_name}")
else:
method_name = method
method_name = util.filter_alias[method] if method in util.filter_alias else method
if method_name == "max_age":
threshold_date = datetime.now() - timedelta(days=filter_data)
if current.originallyAvailableAt is None or current.originallyAvailableAt < threshold_date:
@ -197,12 +217,19 @@ class PlexAPI:
attrs = []
if method_name in ["video_resolution", "audio_language", "subtitle_language"]:
for media in current.media:
if method_name == "video_resolution": attrs.extend([media.videoResolution])
if method_name == "video_resolution":
attrs.extend([media.videoResolution])
for part in media.parts:
if method_name == "audio_language": attrs.extend([a.language for a in part.audioStreams()])
if method_name == "subtitle_language": attrs.extend([s.language for s in part.subtitleStreams()])
elif method_name in ["contentRating", "studio", "year", "rating", "originallyAvailableAt"]: attrs = [str(getattr(current, method_name))]
elif method_name in ["actors", "countries", "directors", "genres", "writers", "collections"]: attrs = [getattr(x, "tag") for x in getattr(current, method_name)]
if method_name == "audio_language":
attrs.extend([a.language for a in part.audioStreams()])
if method_name == "subtitle_language":
attrs.extend([s.language for s in part.subtitleStreams()])
elif method_name in ["contentRating", "studio", "year", "rating", "originallyAvailableAt"]:
attrs = [str(getattr(current, method_name))]
elif method_name in ["actors", "countries", "directors", "genres", "writers", "collections"]:
attrs = [getattr(x, "tag") for x in getattr(current, method_name)]
else:
raise Failed(f"Filter Error: filter: {method_name} not supported")
if (not list(set(filter_data) & set(attrs)) and modifier != ".not") or (list(set(filter_data) & set(attrs)) and modifier == ".not"):
match = False
@ -227,36 +254,37 @@ class PlexAPI:
logger.info("")
if not self.metadata:
raise Failed("No metadata to edit")
for m in self.metadata:
if test and ("test" not in self.metadata[m] or self.metadata[m]["test"] is not True):
for mapping_name, meta in self.metadata.items():
methods = {mm.lower(): mm for mm in meta}
if test and ("test" not in methods or meta[methods["test"]] is not True):
continue
logger.info("")
util.separator()
logger.info("")
year = None
if "year" in self.metadata[m]:
year = util.check_number(self.metadata[m]["year"], "year", minimum=1800, maximum=datetime.now().year + 1)
if "year" in methods:
year = util.check_number(meta[methods["year"]], "year", minimum=1800, maximum=datetime.now().year + 1)
title = m
if "title" in self.metadata[m]:
if self.metadata[m]["title"] is None: logger.error("Metadata Error: title attribute is blank")
else: title = self.metadata[m]["title"]
title = mapping_name
if "title" in methods:
if meta[methods["title"]] is None: logger.error("Metadata Error: title attribute is blank")
else: title = meta[methods["title"]]
item = self.search_item(title, year=year)
if item is None:
item = self.search_item(f"{title} (SUB)", year=year)
if item is None and "alt_title" in self.metadata[m]:
if self.metadata[m]["alt_title"] is None:
if item is None and "alt_title" in methods:
if meta[methods["alt_title"]] is None:
logger.error("Metadata Error: alt_title attribute is blank")
else:
alt_title = self.metadata[m]["alt_title"]
alt_title = meta["alt_title"]
item = self.search_item(alt_title, year=year)
if item is None:
logger.error(f"Plex Error: Item {m} not found")
logger.error(f"Skipping {m}")
logger.error(f"Plex Error: Item {mapping_name} not found")
logger.error(f"Skipping {mapping_name}")
continue
item_type = "Movie" if self.is_movie else "Show"
@ -264,10 +292,10 @@ class PlexAPI:
tmdb_item = None
try:
if "tmdb_id" in self.metadata[m]:
if self.metadata[m]["tmdb_id"] is None: logger.error("Metadata Error: tmdb_id attribute is blank")
if "tmdb_id" in methods:
if meta[methods["tmdb_id"]] is None: logger.error("Metadata Error: tmdb_id attribute is blank")
elif self.is_show: logger.error("Metadata Error: tmdb_id attribute only works with movie libraries")
else: tmdb_item = TMDb.get_show(util.regex_first_int(self.metadata[m]["tmdb_id"], "Show"))
else: tmdb_item = TMDb.get_show(util.regex_first_int(meta[methods["tmdb_id"]], "Show"))
except Failed as e:
logger.error(e)
@ -279,53 +307,203 @@ class PlexAPI:
summary = tmdb_item.overview if tmdb_item else None
edits = {}
def add_edit(name, current, group, key=None, value=None):
if value or name in group:
if value or group[name]:
def add_edit(name, current, group, alias, key=None, value=None):
if value or name in alias:
if value or group[alias[name]]:
if key is None: key = name
if value is None: value = group[name]
if value is None: value = group[alias[name]]
if str(current) != str(value):
edits[f"{key}.value"] = value
edits[f"{key}.locked"] = 1
logger.info(f"Detail: {name} updated to {value}")
else:
logger.error(f"Metadata Error: {name} attribute is blank")
add_edit("title", item.title, self.metadata[m], value=title)
add_edit("sort_title", item.titleSort, self.metadata[m], key="titleSort")
add_edit("originally_available", str(item.originallyAvailableAt)[:-9], self.metadata[m], key="originallyAvailableAt", value=originally_available)
add_edit("rating", item.rating, self.metadata[m], value=rating)
add_edit("content_rating", item.contentRating, self.metadata[m], key="contentRating")
add_edit("original_title", item.originalTitle, self.metadata[m], key="originalTitle", value=original_title)
add_edit("studio", item.studio, self.metadata[m], value=studio)
add_edit("tagline", item.tagline, self.metadata[m], value=tagline)
add_edit("summary", item.summary, self.metadata[m], value=summary)
add_edit("title", item.title, meta, methods, value=title)
add_edit("sort_title", item.titleSort, meta, methods, key="titleSort")
add_edit("originally_available", str(item.originallyAvailableAt)[:-9], meta, methods, key="originallyAvailableAt", value=originally_available)
add_edit("rating", item.rating, meta, methods, value=rating)
add_edit("content_rating", item.contentRating, meta, methods, key="contentRating")
add_edit("original_title", item.originalTitle, meta, methods, key="originalTitle", value=original_title)
add_edit("studio", item.studio, meta, methods, value=studio)
add_edit("tagline", item.tagline, meta, methods, value=tagline)
add_edit("summary", item.summary, meta, methods, value=summary)
if len(edits) > 0:
logger.debug(f"Details Update: {edits}")
try:
item.edit(**edits)
item.reload()
logger.info(f"{item_type}: {m} Details Update Successful")
logger.info(f"{item_type}: {mapping_name} Details Update Successful")
except BadRequest:
util.print_stacktrace()
logger.error(f"{item_type}: {mapping_name} Details Update Failed")
else:
logger.info(f"{item_type}: {mapping_name} Details Update Not Needed")
advance_edits = {}
if self.is_show:
if "episode_sorting" in methods:
if meta[methods["episode_sorting"]]:
method_data = str(meta[methods["episode_sorting"]]).lower()
if method_data in ["default", "oldest", "newest"]:
if method_data == "default" and item.episodeSort != "-1":
advance_edits["episodeSort"] = "-1"
elif method_data == "oldest" and item.episodeSort != "0":
advance_edits["episodeSort"] = "0"
elif method_data == "newest" and item.episodeSort != "1":
advance_edits["episodeSort"] = "1"
if "episodeSort" in advance_edits:
logger.info(f"Detail: episode_sorting updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['episode_sorting']]} episode_sorting attribute invalid")
else:
logger.error(f"Metadata Error: episode_sorting attribute is blank")
if "keep_episodes" in methods:
if meta[methods["keep_episodes"]]:
method_data = str(meta[methods["keep_episodes"]]).lower()
if method_data in ["all", "5_latest", "3_latest", "latest", "past_3", "past_7", "past_30"]:
if method_data == "all" and item.autoDeletionItemPolicyUnwatchedLibrary != 0:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = 0
elif method_data == "5_latest" and item.autoDeletionItemPolicyUnwatchedLibrary != 5:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = 5
elif method_data == "3_latest" and item.autoDeletionItemPolicyUnwatchedLibrary != 3:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = 3
elif method_data == "latest" and item.autoDeletionItemPolicyUnwatchedLibrary != 1:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = 1
elif method_data == "past_3" and item.autoDeletionItemPolicyUnwatchedLibrary != -3:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = -3
elif method_data == "past_7" and item.autoDeletionItemPolicyUnwatchedLibrary != -7:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = -7
elif method_data == "past_30" and item.autoDeletionItemPolicyUnwatchedLibrary != -30:
advance_edits["autoDeletionItemPolicyUnwatchedLibrary"] = -30
if "autoDeletionItemPolicyUnwatchedLibrary" in advance_edits:
logger.info(f"Detail: keep_episodes updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['keep_episodes']]} keep_episodes attribute invalid")
else:
logger.error(f"Metadata Error: keep_episodes attribute is blank")
if "delete_episodes" in methods:
if meta[methods["delete_episodes"]]:
method_data = str(meta[methods["delete_episodes"]]).lower()
if method_data in ["never", "day", "week", "refresh"]:
if method_data == "never" and item.autoDeletionItemPolicyWatchedLibrary != 0:
advance_edits["autoDeletionItemPolicyWatchedLibrary"] = 0
elif method_data == "day" and item.autoDeletionItemPolicyWatchedLibrary != 1:
advance_edits["autoDeletionItemPolicyWatchedLibrary"] = 1
elif method_data == "week" and item.autoDeletionItemPolicyWatchedLibrary != 7:
advance_edits["autoDeletionItemPolicyWatchedLibrary"] = 7
elif method_data == "refresh" and item.autoDeletionItemPolicyWatchedLibrary != 100:
advance_edits["autoDeletionItemPolicyWatchedLibrary"] = 100
if "autoDeletionItemPolicyWatchedLibrary" in advance_edits:
logger.info(f"Detail: delete_episodes updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['delete_episodes']]} delete_episodes attribute invalid")
else:
logger.error(f"Metadata Error: delete_episodes attribute is blank")
if "season_display" in methods:
if meta[methods["season_display"]]:
method_data = str(meta[methods["season_display"]]).lower()
if method_data in ["default", "hide", "show"]:
if method_data == "default" and item.flattenSeasons != -1:
advance_edits["flattenSeasons"] = -1
elif method_data == "show" and item.flattenSeasons != 0:
advance_edits["flattenSeasons"] = 0
elif method_data == "hide" and item.flattenSeasons != 1:
advance_edits["flattenSeasons"] = 1
if "flattenSeasons" in advance_edits:
logger.info(f"Detail: season_display updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['season_display']]} season_display attribute invalid")
else:
logger.error(f"Metadata Error: season_display attribute is blank")
if "episode_ordering" in methods:
if meta[methods["episode_ordering"]]:
method_data = str(meta[methods["episode_ordering"]]).lower()
if method_data in ["default", "tmdb_aired", "tvdb_aired", "tvdb_dvd", "tvdb_absolute"]:
if method_data == "default" and item.showOrdering is not None:
advance_edits["showOrdering"] = None
elif method_data == "tmdb_aired" and item.showOrdering != "tmdbAiring":
advance_edits["showOrdering"] = "tmdbAiring"
elif method_data == "tvdb_aired" and item.showOrdering != "airing":
advance_edits["showOrdering"] = "airing"
elif method_data == "tvdb_dvd" and item.showOrdering != "dvd":
advance_edits["showOrdering"] = "dvd"
elif method_data == "tvdb_absolute" and item.showOrdering != "absolute":
advance_edits["showOrdering"] = "absolute"
if "showOrdering" in advance_edits:
logger.info(f"Detail: episode_ordering updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['episode_ordering']]} episode_ordering attribute invalid")
else:
logger.error(f"Metadata Error: episode_ordering attribute is blank")
if "metadata_language" in methods:
if meta[methods["metadata_language"]]:
method_data = str(meta[methods["metadata_language"]]).lower()
lower_languages = {la.lower(): la for la in util.plex_languages}
if method_data in lower_languages:
if method_data == "default" and item.languageOverride is None:
advance_edits["languageOverride"] = None
elif str(item.languageOverride).lower() != lower_languages[method_data]:
advance_edits["languageOverride"] = lower_languages[method_data]
if "languageOverride" in advance_edits:
logger.info(f"Detail: metadata_language updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['metadata_language']]} metadata_language attribute invalid")
else:
logger.error(f"Metadata Error: metadata_language attribute is blank")
if "use_original_title" in methods:
if meta[methods["use_original_title"]]:
method_data = str(meta[methods["use_original_title"]]).lower()
if method_data in ["default", "no", "yes"]:
if method_data == "default" and item.useOriginalTitle != -1:
advance_edits["useOriginalTitle"] = -1
elif method_data == "no" and item.useOriginalTitle != 0:
advance_edits["useOriginalTitle"] = 0
elif method_data == "yes" and item.useOriginalTitle != 1:
advance_edits["useOriginalTitle"] = 1
if "useOriginalTitle" in advance_edits:
logger.info(f"Detail: use_original_title updated to {method_data}")
else:
logger.error(f"Metadata Error: {meta[methods['use_original_title']]} use_original_title attribute invalid")
else:
logger.error(f"Metadata Error: use_original_title attribute is blank")
if len(advance_edits) > 0:
logger.debug(f"Details Update: {advance_edits}")
try:
check_dict = {pref.id: list(pref.enumValues.keys()) for pref in item.preferences()}
logger.info(check_dict)
item.editAdvanced(**advance_edits)
item.reload()
logger.info(f"{item_type}: {mapping_name} Advanced Details Update Successful")
except BadRequest:
util.print_stacktrace()
logger.error(f"{item_type}: {m} Details Update Failed")
logger.error(f"{item_type}: {mapping_name} Details Update Failed")
else:
logger.info(f"{item_type}: {m} Details Update Not Needed")
logger.info(f"{item_type}: {mapping_name} Details Update Not Needed")
genres = []
if tmdb_item:
genres.extend([genre.name for genre in tmdb_item.genres])
if "genre" in self.metadata[m]:
if self.metadata[m]["genre"]: genres.extend(util.get_list(self.metadata[m]["genre"]))
else: logger.error("Metadata Error: genre attribute is blank")
if "genre" in methods:
if meta[methods["genre"]]:
genres.extend(util.get_list(meta[methods["genre"]]))
else:
logger.error("Metadata Error: genre attribute is blank")
if len(genres) > 0:
item_genres = [genre.tag for genre in item.genres]
if "genre_sync_mode" in self.metadata[m]:
if self.metadata[m]["genre_sync_mode"] is None: logger.error("Metadata Error: genre_sync_mode attribute is blank defaulting to append")
elif self.metadata[m]["genre_sync_mode"] not in ["append", "sync"]: logger.error("Metadata Error: genre_sync_mode attribute must be either 'append' or 'sync' defaulting to append")
elif self.metadata[m]["genre_sync_mode"] == "sync":
if "genre_sync_mode" in methods:
if meta[methods["genre_sync_mode"]] is None:
logger.error("Metadata Error: genre_sync_mode attribute is blank defaulting to append")
elif str(meta[methods["genre_sync_mode"]]).lower() not in ["append", "sync"]:
logger.error("Metadata Error: genre_sync_mode attribute must be either 'append' or 'sync' defaulting to append")
elif str(meta["genre_sync_mode"]).lower() == "sync":
for genre in (g for g in item_genres if g not in genres):
item.removeGenre(genre)
logger.info(f"Detail: Genre {genre} removed")
@ -333,14 +511,16 @@ class PlexAPI:
item.addGenre(genre)
logger.info(f"Detail: Genre {genre} added")
if "label" in self.metadata[m]:
if self.metadata[m]["label"]:
if "label" in methods:
if meta[methods["label"]]:
item_labels = [label.tag for label in item.labels]
labels = util.get_list(self.metadata[m]["label"])
if "label_sync_mode" in self.metadata[m]:
if self.metadata[m]["label_sync_mode"] is None: logger.error("Metadata Error: label_sync_mode attribute is blank defaulting to append")
elif self.metadata[m]["label_sync_mode"] not in ["append", "sync"]: logger.error("Metadata Error: label_sync_mode attribute must be either 'append' or 'sync' defaulting to append")
elif self.metadata[m]["label_sync_mode"] == "sync":
labels = util.get_list(meta[methods["label"]])
if "label_sync_mode" in methods:
if meta[methods["label_sync_mode"]] is None:
logger.error("Metadata Error: label_sync_mode attribute is blank defaulting to append")
elif str(meta[methods["label_sync_mode"]]).lower() not in ["append", "sync"]:
logger.error("Metadata Error: label_sync_mode attribute must be either 'append' or 'sync' defaulting to append")
elif str(meta[methods["label_sync_mode"]]).lower() == "sync":
for label in (la for la in item_labels if la not in labels):
item.removeLabel(label)
logger.info(f"Detail: Label {label} removed")
@ -350,33 +530,35 @@ class PlexAPI:
else:
logger.error("Metadata Error: label attribute is blank")
if "seasons" in self.metadata[m] and self.is_show:
if self.metadata[m]["seasons"]:
for season_id in self.metadata[m]["seasons"]:
if "seasons" in methods and self.is_show:
if meta[methods["seasons"]]:
for season_id in meta[methods["seasons"]]:
logger.info("")
logger.info(f"Updating season {season_id} of {m}...")
logger.info(f"Updating season {season_id} of {mapping_name}...")
if isinstance(season_id, int):
try: season = item.season(season_id)
except NotFound: logger.error(f"Metadata Error: Season: {season_id} not found")
else:
season_dict = meta[methods["seasons"]][season_id]
season_methods = {sm.lower(): sm for sm in season_dict}
if "title" in self.metadata[m]["seasons"][season_id] and self.metadata[m]["seasons"][season_id]["title"]:
title = self.metadata[m]["seasons"][season_id]["title"]
if "title" in season_methods and season_dict[season_methods["title"]]:
title = season_dict[season_methods["title"]]
else:
title = season.title
if "sub" in self.metadata[m]["seasons"][season_id]:
if self.metadata[m]["seasons"][season_id]["sub"] is None:
if "sub" in season_methods:
if season_dict[season_methods["sub"]] is None:
logger.error("Metadata Error: sub attribute is blank")
elif self.metadata[m]["seasons"][season_id]["sub"] is True and "(SUB)" not in title:
elif season_dict[season_methods["sub"]] is True and "(SUB)" not in title:
title = f"{title} (SUB)"
elif self.metadata[m]["seasons"][season_id]["sub"] is False and title.endswith(" (SUB)"):
elif season_dict[season_methods["sub"]] is False and title.endswith(" (SUB)"):
title = title[:-6]
else:
logger.error("Metadata Error: sub attribute must be True or False")
edits = {}
add_edit("title", season.title, self.metadata[m]["seasons"][season_id], value=title)
add_edit("summary", season.summary, self.metadata[m]["seasons"][season_id])
add_edit("title", season.title, season_dict, season_methods, value=title)
add_edit("summary", season.summary, season_methods, season_dict)
if len(edits) > 0:
logger.debug(f"Season: {season_id} Details Update: {edits}")
try:
@ -393,38 +575,41 @@ class PlexAPI:
else:
logger.error("Metadata Error: seasons attribute is blank")
if "episodes" in self.metadata[m] and self.is_show:
if self.metadata[m]["episodes"]:
for episode_str in self.metadata[m]["episodes"]:
if "episodes" in methods and self.is_show:
if meta[methods["episodes"]]:
for episode_str in meta[methods["episodes"]]:
logger.info("")
match = re.search("[Ss]\\d+[Ee]\\d+", episode_str)
if match:
output = match.group(0)[1:].split("E" if "E" in match.group(0) else "e")
episode_id = int(output[0])
season_id = int(output[1])
logger.info(f"Updating episode S{episode_id}E{season_id} of {m}...")
logger.info(f"Updating episode S{episode_id}E{season_id} of {mapping_name}...")
try: episode = item.episode(season=season_id, episode=episode_id)
except NotFound: logger.error(f"Metadata Error: episode {episode_id} of season {season_id} not found")
else:
if "title" in self.metadata[m]["episodes"][episode_str] and self.metadata[m]["episodes"][episode_str]["title"]:
title = self.metadata[m]["episodes"][episode_str]["title"]
episode_dict = meta[methods["episodes"]][episode_str]
episode_methods = {em.lower(): em for em in episode_dict}
if "title" in episode_methods and episode_dict[episode_methods["title"]]:
title = episode_dict[episode_methods["title"]]
else:
title = episode.title
if "sub" in self.metadata[m]["episodes"][episode_str]:
if self.metadata[m]["episodes"][episode_str]["sub"] is None:
if "sub" in episode_dict:
if episode_dict[episode_methods["sub"]] is None:
logger.error("Metadata Error: sub attribute is blank")
elif self.metadata[m]["episodes"][episode_str]["sub"] is True and "(SUB)" not in title:
elif episode_dict[episode_methods["sub"]] is True and "(SUB)" not in title:
title = f"{title} (SUB)"
elif self.metadata[m]["episodes"][episode_str]["sub"] is False and title.endswith(" (SUB)"):
elif episode_dict[episode_methods["sub"]] is False and title.endswith(" (SUB)"):
title = title[:-6]
else:
logger.error("Metadata Error: sub attribute must be True or False")
edits = {}
add_edit("title", episode.title, self.metadata[m]["episodes"][episode_str], value=title)
add_edit("sort_title", episode.titleSort, self.metadata[m]["episodes"][episode_str], key="titleSort")
add_edit("rating", episode.rating, self.metadata[m]["episodes"][episode_str])
add_edit("originally_available", str(episode.originallyAvailableAt)[:-9], self.metadata[m]["episodes"][episode_str], key="originallyAvailableAt")
add_edit("summary", episode.summary, self.metadata[m]["episodes"][episode_str])
add_edit("title", episode.title, episode_dict, episode_methods, value=title)
add_edit("sort_title", episode.titleSort, episode_dict, episode_methods, key="titleSort")
add_edit("rating", episode.rating, episode_dict, episode_methods)
add_edit("originally_available", str(episode.originallyAvailableAt)[:-9], episode_dict, episode_methods, key="originallyAvailableAt")
add_edit("summary", episode.summary, episode_dict, episode_methods)
if len(edits) > 0:
logger.debug(f"Season: {season_id} Episode: {episode_id} Details Update: {edits}")
try:

@ -1,4 +1,5 @@
import logging, tmdbv3api
from datetime import datetime
from modules import util
from modules.util import Failed
from retrying import retry
@ -155,6 +156,9 @@ class TMDbAPI:
def get_discover(self, attrs, amount, is_movie):
ids = []
count = 0
for date_attr in util.discover_dates:
if date_attr in attrs:
attrs[date_attr] = datetime.strftime(datetime.strptime(attrs[date_attr], "%m/%d/%Y"), "%Y-%m-%d")
self.Discover.discover_movies(attrs) if is_movie else self.Discover.discover_tv_shows(attrs)
total_pages = int(self.TMDb.total_pages)
total_results = int(self.TMDb.total_results)

@ -112,7 +112,7 @@ class TraktAPI:
return requests.get(url, headers={"Content-Type": "application/json", "trakt-api-version": "2", "trakt-api-key": self.client_id}).json()
def get_pagenation(self, pagenation, amount, is_movie):
items = self.send_request(f"{self.base_url}/{'movies' if not is_movie else 'shows'}/{pagenation}?limit={amount}")
items = self.send_request(f"{self.base_url}/{'movies' if is_movie else 'shows'}/{pagenation}?limit={amount}")
if pagenation == "popular" and is_movie: return [item["ids"]["tmdb"] for item in items], []
elif pagenation == "popular": return [], [item["ids"]["tvdb"] for item in items]
elif is_movie: return [item["movie"]["ids"]["tmdb"] for item in items], []

@ -29,11 +29,20 @@ method_alias = {
"decades": "decade",
"directors": "director",
"genres": "genre",
"labels": "label",
"studios": "studio", "network": "studio", "networks": "studio",
"producers": "producer",
"writers": "writer",
"years": "year"
}
search_alias = {
"audio_language": "audioLanguage",
"content_rating": "contentRating",
"subtitle_language": "subtitleLanguage",
"added": "addedAt",
"originally_available": "originallyAvailableAt",
"rating": "userRating"
}
filter_alias = {
"actor": "actors",
"collection": "collections",
@ -87,11 +96,13 @@ pretty_names = {
"anidb_id": "AniDB ID",
"anidb_relation": "AniDB Relation",
"anidb_popular": "AniDB Popular",
"anilist_genre": "AniList Genre",
"anilist_id": "AniList ID",
"anilist_popular": "AniList Popular",
"anilist_relations": "AniList Relations",
"anilist_season": "AniList Season",
"anilist_studio": "AniList Studio",
"anilist_tag": "AniList Tag",
"anilist_top_rated": "AniList Top Rated",
"imdb_list": "IMDb List",
"imdb_id": "IMDb ID",
@ -158,6 +169,10 @@ pretty_names = {
"tvdb_show": "TVDb Show",
"tvdb_show_details": "TVDb Show"
}
plex_languages = ["default", "ar-SA", "ca-ES", "cs-CZ", "da-DK", "de-DE", "el-GR", "en-AU", "en-CA", "en-GB", "en-US", "es-ES",
"es-MX", "et-EE", "fa-IR", "fi-FI", "fr-CA", "fr-FR", "he-IL", "hi-IN", "hu-HU", "id-ID", "it-IT",
"ja-JP", "ko-KR", "lt-LT", "lv-LV", "nb-NO", "nl-NL", "pl-PL", "pt-BR", "pt-PT", "ro-RO", "ru-RU",
"sk-SK", "sv-SE", "th-TH", "tr-TR", "uk-UA", "vi-VN", "zh-CN", "zh-HK", "zh-TW"]
mal_ranked_name = {
"mal_all": "all",
"mal_airing": "airing",
@ -224,11 +239,13 @@ all_lists = [
"anidb_id",
"anidb_relation",
"anidb_popular",
"anilist_genre",
"anilist_id",
"anilist_popular",
"anilist_relations",
"anilist_season",
"anilist_studio",
"anilist_tag",
"anilist_top_rated",
"imdb_list",
"imdb_id",
@ -311,7 +328,9 @@ other_attributes = [
]
dictionary_lists = [
"filters",
"anilist_genre",
"anilist_season",
"anilist_tag",
"mal_season",
"mal_userlist",
"plex_collectionless",
@ -320,18 +339,6 @@ dictionary_lists = [
"tautulli_watched",
"tmdb_discover"
]
plex_searches = [
"actor", #"actor.not", # Waiting on PlexAPI to fix issue
"country", #"country.not",
"decade", #"decade.not",
"director", #"director.not",
"genre", #"genre.not",
"producer", #"producer.not",
"studio", #"studio.not",
"title",
"writer", #"writer.not"
"year" #"year.not",
]
show_only_lists = [
"tmdb_network",
"tmdb_show",
@ -350,20 +357,6 @@ movie_only_lists = [
"tvdb_movie",
"tvdb_movie_details"
]
movie_only_searches = [
"actor", "actor.not",
"country", "country.not",
"decade", "decade.not",
"director", "director.not",
"producer", "producer.not",
"writer", "writer.not"
]
tmdb_searches = [
"actor", "actor.not",
"director", "director.not",
"producer", "producer.not",
"writer", "writer.not"
]
count_lists = [
"anidb_popular",
"anilist_popular",
@ -442,6 +435,59 @@ tmdb_type = {
"tmdb_writer": "Person",
"tmdb_writer_details": "Person"
}
plex_searches = [
"title", "title.and", "title.not", "title.begins", "title.ends",
"studio", "studio.and", "studio.not", "studio.begins", "studio.ends",
"actor", "actor.and", "actor.not",
"audio_language", "audio_language.and", "audio_language.not",
"collection", "collection.and", "collection.not",
"content_rating", "content_rating.and", "content_rating.not",
"country", "country.and", "country.not",
"director", "director.and", "director.not",
"genre", "genre.and", "genre.not",
"label", "label.and", "label.not",
"producer", "producer.and", "producer.not",
"subtitle_language", "subtitle_language.and", "subtitle_language.not",
"writer", "writer.and", "writer.not",
"decade", "resolution",
"added.before", "added.after",
"originally_available.before", "originally_available.after",
"duration.greater", "duration.less",
"rating.greater", "rating.less",
"year", "year.not", "year.greater", "year.less"
]
plex_sort = {
"title.asc": "titleSort:asc", "title.desc": "titleSort:desc",
"originally_available.asc": "originallyAvailableAt:asc", "originally_available.desc": "originallyAvailableAt:desc",
"critic_rating.asc": "rating:asc", "critic_rating.desc": "rating:desc",
"audience_rating.asc": "audienceRating:asc", "audience_rating.desc": "audienceRating:desc",
"duration.asc": "duration:asc", "duration.desc": "duration:desc",
"added.asc": "addedAt:asc", "added.desc": "addedAt:desc"
}
plex_modifiers = {
".and": "&",
".not": "!",
".begins": "<",
".ends": ">",
".before": "<<",
".after": ">>",
".greater": ">>",
".less": "<<"
}
movie_only_searches = [
"audio_language", "audio_language.and", "audio_language.not",
"country", "country.and", "country.not",
"subtitle_language", "subtitle_language.and", "subtitle_language.not",
"decade", "resolution",
"originally_available.before", "originally_available.after",
"duration.greater", "duration.less"
]
tmdb_searches = [
"actor", "actor.and", "actor.not",
"director", "director.and", "director.not",
"producer", "producer.and", "producer.not",
"writer", "writer.and", "writer.not"
]
all_filters = [
"actor", "actor.not",
"audio_language", "audio_language.not",
@ -516,6 +562,12 @@ discover_tv = [
"include_null_first_air_dates",
"screened_theatrically"
]
discover_dates = [
"primary_release_date.gte", "primary_release_date.lte",
"release_date.gte", "release_date.lte",
"air_date.gte", "air_date.lte",
"first_air_date.gte", "first_air_date.lte"
]
discover_movie_sort = [
"popularity.asc", "popularity.desc",
"release_date.asc", "release_date.desc",
@ -596,25 +648,11 @@ def get_int_list(data, id_type):
except Failed as e: logger.error(e)
return int_values
def get_year_list(data, method):
values = get_list(data)
def get_year_list(data, current_year, method):
final_years = []
current_year = datetime.now().year
values = get_list(data)
for value in values:
try:
if "-" in value:
year_range = re.search("(\\d{4})-(\\d{4}|NOW)", str(value))
start = check_year(year_range.group(1), current_year, method)
end = current_year if year_range.group(2) == "NOW" else check_year(year_range.group(2), current_year, method)
if int(start) > int(end):
raise Failed(f"Collection Error: {method} starting year: {start} cannot be greater then ending year {end}")
else:
for i in range(int(start), int(end) + 1):
final_years.append(int(i))
else:
final_years.append(check_year(value, current_year, method))
except AttributeError:
raise Failed(f"Collection Error: {method} failed to parse year from {value}")
return final_years
def check_year(year, current_year, method):
@ -637,9 +675,9 @@ def check_number(value, method, number_type="int", minimum=None, maximum=None):
else:
return num_value
def check_date(date_text, method, return_string=False):
try: date_obg = datetime.strptime(str(date_text), "%m/%d/%Y")
except ValueError: raise Failed(f"Collection Error: {method}: {date_text} must match pattern MM/DD/YYYY e.g. 12/25/2020")
def check_date(date_text, method, return_string=False, plex_date=False):
try: date_obg = datetime.strptime(str(date_text), "%Y/%m/%d" if plex_date else "%m/%d/%Y")
except ValueError: raise Failed(f"Collection Error: {method}: {date_text} must match pattern {'YYYY/MM/DD e.g. 2020/12/25' if plex_date else 'MM/DD/YYYY e.g. 12/25/2020'}")
return str(date_text) if return_string else date_obg
def logger_input(prompt, timeout=60):
@ -692,7 +730,6 @@ def windows_input(prompt, timeout=5):
print("")
raise TimeoutExpired
def print_multiline(lines, info=False, warning=False, error=False, critical=False):
for i, line in enumerate(str(lines).split("\n")):
if critical: logger.critical(line)
@ -729,7 +766,7 @@ def regex_first_int(data, id_type, default=None):
def remove_not(method):
return method[:-4] if method.endswith(".not") else method
def get_centered_text(text):
def centered(text, do_print=True):
if len(text) > screen_width - 2:
raise Failed("text must be shorter then screen_width")
space = screen_width - len(text) - 2
@ -737,7 +774,10 @@ def get_centered_text(text):
text += " "
space -= 1
side = int(space / 2)
return f"{' ' * side}{text}{' ' * side}"
final_text = f"{' ' * side}{text}{' ' * side}"
if do_print:
logger.info(final_text)
return final_text
def separator(text=None):
logger.handlers[0].setFormatter(logging.Formatter(f"%(message)-{screen_width - 2}s"))
@ -746,7 +786,7 @@ def separator(text=None):
if text:
text_list = text.split("\n")
for t in text_list:
logger.info(f"| {get_centered_text(t)} |")
logger.info(f"| {centered(t, do_print=False)} |")
logger.info(f"|{separating_character * screen_width}|")
logger.handlers[0].setFormatter(logging.Formatter(f"| %(message)-{screen_width - 2}s |"))
logger.handlers[1].setFormatter(logging.Formatter(f"[%(asctime)s] %(filename)-27s %(levelname)-10s | %(message)-{screen_width - 2}s |"))

@ -80,14 +80,14 @@ logger.addHandler(file_handler)
sys.excepthook = util.my_except_hook
util.separator()
logger.info(util.get_centered_text(" "))
logger.info(util.get_centered_text(" ____ _ __ __ _ __ __ "))
logger.info(util.get_centered_text("| _ \\| | _____ __ | \\/ | ___| |_ __ _ | \\/ | __ _ _ __ __ _ __ _ ___ _ __ "))
logger.info(util.get_centered_text("| |_) | |/ _ \\ \\/ / | |\\/| |/ _ \\ __/ _` | | |\\/| |/ _` | '_ \\ / _` |/ _` |/ _ \\ '__|"))
logger.info(util.get_centered_text("| __/| | __/> < | | | | __/ || (_| | | | | | (_| | | | | (_| | (_| | __/ | "))
logger.info(util.get_centered_text("|_| |_|\\___/_/\\_\\ |_| |_|\\___|\\__\\__,_| |_| |_|\\__,_|_| |_|\\__,_|\\__, |\\___|_| "))
logger.info(util.get_centered_text(" |___/ "))
logger.info(util.get_centered_text(" Version: 1.5.1 "))
util.centered(" ")
util.centered(" ____ _ __ __ _ __ __ ")
util.centered("| _ \\| | _____ __ | \\/ | ___| |_ __ _ | \\/ | __ _ _ __ __ _ __ _ ___ _ __ ")
util.centered("| |_) | |/ _ \\ \\/ / | |\\/| |/ _ \\ __/ _` | | |\\/| |/ _` | '_ \\ / _` |/ _` |/ _ \\ '__|")
util.centered("| __/| | __/> < | | | | __/ || (_| | | | | | (_| | | | | (_| | (_| | __/ | ")
util.centered("|_| |_|\\___/_/\\_\\ |_| |_|\\___|\\__\\__,_| |_| |_|\\__,_|_| |_|\\__,_|\\__, |\\___|_| ")
util.centered(" |___/ ")
util.centered(" Version: 1.6.0 ")
util.separator()
if my_tests:

@ -1,8 +1,8 @@
# Remove
# Less common, pinned
PlexAPI==4.4.1
PlexAPI==4.5.1
tmdbv3api==1.7.5
trakt.py==4.2.0
trakt.py==4.3.0
# More common, flexible
lxml
requests>=2.4.2

Loading…
Cancel
Save