You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/util.py

643 lines
21 KiB

import datetime, logging, re, signal, sys, time, traceback
try:
import msvcrt
windows = True
except ModuleNotFoundError:
windows = False
logger = logging.getLogger("Plex Meta Manager")
class TimeoutExpired(Exception):
pass
class Failed(Exception):
pass
def retry_if_not_failed(exception):
return not isinstance(exception, Failed)
seperating_character = "="
screen_width = 100
method_alias = {
"actors": "actor", "role": "actor", "roles": "actor",
"content_ratings": "content_rating", "contentRating": "content_rating", "contentRatings": "content_rating",
"countries": "country",
"decades": "decade",
"directors": "director",
"genres": "genre",
"studios": "studio", "network": "studio", "networks": "studio",
"producers": "producer",
"writers": "writer",
"years": "year"
}
filter_alias = {
"actor": "actors",
"audio_language": "audio_language",
"collection": "collections",
"content_rating": "contentRating",
"country": "countries",
"director": "directors",
"genre": "genres",
"max_age": "max_age",
"originally_available": "originallyAvailableAt",
"original_language": "original_language",
"rating": "rating",
"studio": "studio",
"subtitle_language": "subtitle_language",
"writer": "writers",
"video_resolution": "video_resolution",
"year": "year"
}
days_alias = {
"monday": 0, "mon": 0, "m": 0,
"tuesday": 1, "tues": 1, "tue": 1, "tu": 1, "t": 1,
"wednesday": 2, "wed": 2, "w": 2,
"thursday": 3, "thurs": 3, "thur": 3, "thu": 3, "th": 3, "r": 3,
"friday": 4, "fri": 4, "f": 4,
"saturday": 5, "sat": 5, "s": 5,
"sunday": 6, "sun": 6, "su": 6, "u": 6
}
pretty_days = {
0: "Monday",
1: "Tuesday",
2: "Wednesday",
3: "Thursday",
4: "Friday",
5: "Saturday",
6: "Sunday"
}
pretty_months = {
1: "January",
2: "February",
3: "March",
4: "April",
5: "May",
6: "June",
7: "July",
8: "August",
9: "September",
10: "October",
11: "November",
12: "December"
}
pretty_seasons = {
"winter": "Winter",
"spring": "Spring",
"summer": "Summer",
"fall": "Fall"
}
pretty_names = {
"anidb_id": "AniDB ID",
"anidb_relation": "AniDB Relation",
"anidb_popular": "AniDB Popular",
"imdb_list": "IMDb List",
"imdb_id": "IMDb ID",
"mal_id": "MyAnimeList ID",
"mal_all": "MyAnimeList All",
"mal_airing": "MyAnimeList Airing",
"mal_upcoming": "MyAnimeList Upcoming",
"mal_tv": "MyAnimeList TV",
"mal_ova": "MyAnimeList OVA",
"mal_movie": "MyAnimeList Movie",
"mal_special": "MyAnimeList Special",
"mal_popular": "MyAnimeList Popular",
"mal_favorite": "MyAnimeList Favorite",
"mal_season": "MyAnimeList Season",
"mal_suggested": "MyAnimeList Suggested",
"mal_userlist": "MyAnimeList Userlist",
"plex_all": "Plex All",
"plex_collection": "Plex Collection",
"plex_search": "Plex Search",
"tautulli_popular": "Tautulli Popular",
"tautulli_watched": "Tautulli Watched",
"tmdb_collection": "TMDb Collection",
"tmdb_collection_details": "TMDb Collection",
"tmdb_company": "TMDb Company",
"tmdb_discover": "TMDb Discover",
"tmdb_keyword": "TMDb Keyword",
"tmdb_list": "TMDb List",
"tmdb_list_details": "TMDb List",
"tmdb_movie": "TMDb Movie",
"tmdb_movie_details": "TMDb Movie",
"tmdb_network": "TMDb Network",
"tmdb_now_playing": "TMDb Now Playing",
"tmdb_person": "TMDb Person",
"tmdb_popular": "TMDb Popular",
"tmdb_show": "TMDb Show",
"tmdb_show_details": "TMDb Show",
"tmdb_top_rated": "TMDb Top Rated",
"tmdb_trending_daily": "TMDb Trending Daily",
"tmdb_trending_weekly": "TMDb Trending Weekly",
"trakt_list": "Trakt List",
"trakt_trending": "Trakt Trending",
"trakt_watchlist": "Trakt Watchlist",
"tvdb_list": "TVDb List",
"tvdb_movie": "TVDb Movie",
"tvdb_show": "TVDb Show"
}
mal_ranked_name = {
"mal_all": "all",
"mal_airing": "airing",
"mal_upcoming": "upcoming",
"mal_tv": "tv",
"mal_ova": "ova",
"mal_movie": "movie",
"mal_special": "special",
"mal_popular": "bypopularity",
"mal_favorite": "favorite"
}
mal_season_sort = {
"anime_score": "anime_score",
"anime_num_list_users": "anime_num_list_users",
"score": "anime_score",
"members": "anime_num_list_users"
}
mal_pretty = {
"anime_score": "Score",
"anime_num_list_users": "Members",
"list_score": "Score",
"list_updated_at": "Last Updated",
"anime_title": "Title",
"anime_start_date": "Start Date",
"all": "All Anime",
"watching": "Currently Watching",
"completed": "Completed",
"on_hold": "On Hold",
"dropped": "Dropped",
"plan_to_watch": "Plan to Watch"
}
mal_userlist_sort = {
"score": "list_score",
"list_score": "list_score",
"last_updated": "list_updated_at",
"list_updated": "list_updated_at",
"list_updated_at": "list_updated_at",
"title": "anime_title",
"anime_title": "anime_title",
"start_date": "anime_start_date",
"anime_start_date": "anime_start_date"
}
mal_userlist_status = [
"all",
"watching",
"completed",
"on_hold",
"dropped",
"plan_to_watch"
]
pretty_ids = {
"anidbid": "AniDB",
"imdbid": "IMDb",
"mal_id": "MyAnimeList",
"themoviedb_id": "TMDb",
"thetvdb_id": "TVDb",
"tvdbid": "TVDb"
}
all_lists = [
"anidb_id",
"anidb_relation",
"anidb_popular",
"imdb_list",
"imdb_id",
"mal_id",
"mal_all",
"mal_airing",
"mal_upcoming",
"mal_tv",
"mal_ova",
"mal_movie",
"mal_special",
"mal_popular",
"mal_favorite",
"mal_season",
"mal_suggested",
"mal_userlist",
"plex_collection",
"plex_search",
"tautulli_popular",
"tautulli_watched",
"tmdb_collection",
"tmdb_collection_details",
"tmdb_company",
"tmdb_discover",
"tmdb_keyword",
"tmdb_list",
"tmdb_list_details",
"tmdb_movie",
"tmdb_movie_details",
"tmdb_network",
"tmdb_now_playing",
"tmdb_popular",
"tmdb_show",
"tmdb_show_details",
"tmdb_top_rated",
"tmdb_trending_daily",
"tmdb_trending_weekly",
"trakt_list",
"trakt_trending",
"trakt_watchlist",
"tvdb_list",
"tvdb_movie",
"tvdb_show"
]
collectionless_lists = [
"sort_title", "content_rating",
"summary", "tmdb_summary", "tmdb_description", "tmdb_biography",
"collection_order", "plex_collectionless",
"url_poster", "tmdb_poster", "tmdb_profile", "file_poster",
"url_background", "file_background",
"name_mapping"
]
other_attributes = [
"schedule",
"sync_mode",
"test",
"tmdb_person"
]
dictionary_lists = [
"filters",
"mal_season",
"mal_userlist",
"plex_collectionless",
"plex_search",
"tautulli_popular",
"tautulli_watched",
"tmdb_discover"
]
plex_searches = [
"actor", #"actor.not", # Waiting on PlexAPI to fix issue
"country", #"country.not",
"decade", #"decade.not",
"director", #"director.not",
"genre", #"genre.not",
"producer", #"producer.not",
"studio", #"studio.not",
"writer", #"writer.not"
"year" #"year.not",
]
show_only_lists = [
"tmdb_network",
"tmdb_show",
"tmdb_show_details",
"tvdb_show"
]
movie_only_lists = [
"tmdb_collection",
"tmdb_collection_details",
"tmdb_movie",
"tmdb_movie_details",
"tmdb_now_playing",
"tvdb_movie"
]
movie_only_searches = [
"actor", "actor.not",
"country", "country.not",
"decade", "decade.not",
"director", "director.not",
"producer", "producer.not",
"writer", "writer.not"
]
tmdb_searches = [
"actor", "actor.not",
"director", "director.not",
"producer", "producer.not",
"writer", "writer.not"
]
count_lists = [
"anidb_popular",
"mal_all",
"mal_airing",
"mal_upcoming",
"mal_tv",
"mal_ova",
"mal_movie",
"mal_special",
"mal_popular",
"mal_favorite",
"mal_suggested",
"tmdb_popular",
"tmdb_top_rated",
"tmdb_now_playing",
"tmdb_trending_daily",
"tmdb_trending_weekly",
"trakt_trending"
]
tmdb_lists = [
"tmdb_collection",
"tmdb_collection_details",
"tmdb_company",
"tmdb_discover",
"tmdb_keyword",
"tmdb_list",
"tmdb_list_details",
"tmdb_movie",
"tmdb_movie_details",
"tmdb_network",
"tmdb_now_playing",
"tmdb_popular",
"tmdb_show",
"tmdb_show_details",
"tmdb_top_rated",
"tmdb_trending_daily",
"tmdb_trending_weekly"
]
tmdb_type = {
"tmdb_collection": "Collection",
"tmdb_collection_details": "Collection",
"tmdb_company": "Company",
"tmdb_keyword": "Keyword",
"tmdb_list": "List",
"tmdb_list_details": "List",
"tmdb_movie": "Movie",
"tmdb_movie_details": "Movie",
"tmdb_network": "Network",
"tmdb_person": "Person",
"tmdb_show": "Show",
"tmdb_show_details": "Show"
}
all_filters = [
"actor", "actor.not",
"audio_language", "audio_language.not",
"collection", "collection.not",
"content_rating", "content_rating.not",
"country", "country.not",
"director", "director.not",
"genre", "genre.not",
"max_age",
"originally_available.gte", "originally_available.lte",
"original_language", "original_language.not",
"rating.gte", "rating.lte",
"studio", "studio.not",
"subtitle_language", "subtitle_language.not",
"video_resolution", "video_resolution.not",
"writer", "writer.not",
"year", "year.gte", "year.lte", "year.not"
]
movie_only_filters = [
"audio_language", "audio_language.not",
"country", "country.not",
"director", "director.not",
"original_language", "original_language.not",
"subtitle_language", "subtitle_language.not",
"video_resolution", "video_resolution.not",
"writer", "writer.not"
]
all_details = [
"sort_title", "content_rating",
"summary", "tmdb_summary", "tmdb_description", "tmdb_biography",
"collection_mode", "collection_order",
"url_poster", "tmdb_poster", "tmdb_profile", "file_poster",
"url_background", "file_background",
"name_mapping", "add_to_arr"
]
discover_movie = [
"language", "with_original_language", "region", "sort_by",
"certification_country", "certification", "certification.lte", "certification.gte",
"include_adult",
"primary_release_year", "primary_release_date.gte", "primary_release_date.lte",
"release_date.gte", "release_date.lte", "year",
"vote_count.gte", "vote_count.lte",
"vote_average.gte", "vote_average.lte",
"with_cast", "with_crew", "with_people",
"with_companies",
"with_genres", "without_genres",
"with_keywords", "without_keywords",
"with_runtime.gte", "with_runtime.lte"
]
discover_tv = [
"language", "with_original_language", "timezone", "sort_by",
"air_date.gte", "air_date.lte",
"first_air_date.gte", "first_air_date.lte", "first_air_date_year",
"vote_count.gte", "vote_count.lte",
"vote_average.gte", "vote_average.lte",
"with_genres", "without_genres",
"with_keywords", "without_keywords",
"with_networks", "with_companies",
"with_runtime.gte", "with_runtime.lte",
"include_null_first_air_dates",
"screened_theatrically"
]
discover_movie_sort = [
"popularity.asc", "popularity.desc",
"release_date.asc", "release_date.desc",
"revenue.asc", "revenue.desc",
"primary_release_date.asc", "primary_release_date.desc",
"original_title.asc", "original_title.desc",
"vote_average.asc", "vote_average.desc",
"vote_count.asc", "vote_count.desc"
]
discover_tv_sort = [
"vote_average.desc", "vote_average.asc",
"first_air_date.desc", "first_air_date.asc",
"popularity.desc", "popularity.asc"
]
def adjust_space(old_length, display_title):
display_title = str(display_title)
space_length = old_length - len(display_title)
if space_length > 0:
display_title += " " * space_length
return display_title
def make_ordinal(n):
n = int(n)
suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]
if 11 <= (n % 100) <= 13:
suffix = "th"
return str(n) + suffix
def choose_from_list(datalist, description, data=None, list_type="title", exact=False):
if len(datalist) > 0:
if len(datalist) == 1 and (description != "collection" or datalist[0].title == data):
return datalist[0]
message = "Multiple {}s Found\n0) {}".format(description, "Create New Collection: {}".format(data) if description == "collection" else "Do Nothing")
for i, d in enumerate(datalist, 1):
if list_type == "title":
if d.title == data:
return d
message += "\n{}) {}".format(i, d.title)
else:
message += "\n{}) [{}] {}".format(i, d[0], d[1])
if exact:
return None
print_multiline(message, info=True)
while True:
try:
selection = int(logger_input("Choose {} number".format(description))) - 1
if selection >= 0: return datalist[selection]
elif selection == -1: return None
else: logger.info("Invalid {} number".format(description))
except IndexError: logger.info("Invalid {} number".format(description))
except TimeoutExpired:
if list_type == "title":
logger.warning("Input Timeout: using {}".format(data))
return None
else:
logger.warning("Input Timeout: using {}".format(datalist[0][1]))
return datalist[0][1]
else:
return None
def get_list(data, lower=False, split=True):
if isinstance(data, list): return data
elif isinstance(data, dict): return [data]
elif split is False: return [str(data)]
elif lower is True: return [d.strip().lower() for d in str(data).split(",")]
else: return [d.strip() for d in str(data).split(",")]
def get_int_list(data, id_type):
values = get_list(data)
int_values = []
for value in values:
try: int_values.append(regex_first_int(value, id_type))
except Failed as e: logger.error(e)
return int_values
def get_year_list(data, method):
values = get_list(data)
final_years = []
current_year = datetime.datetime.now().year
for value in values:
try:
if "-" in value:
year_range = re.search("(\\d{4})-(\\d{4}|NOW)", str(value))
start = year_range.group(1)
end = year_range.group(2)
if end == "NOW":
end = current_year
if int(start) < 1800 or int(start) > current_year: logger.error("Collection Error: Skipping {} starting year {} must be between 1800 and {}".format(method, start, current_year))
elif int(end) < 1800 or int(end) > current_year: logger.error("Collection Error: Skipping {} ending year {} must be between 1800 and {}".format(method, end, current_year))
elif int(start) > int(end): logger.error("Collection Error: Skipping {} starting year {} cannot be greater then ending year {}".format(method, start, end))
else:
for i in range(int(start), int(end) + 1):
final_years.append(i)
else:
year = re.search("(\\d+)", str(value)).group(1)
if int(start) < 1800 or int(start) > current_year:
logger.error("Collection Error: Skipping {} year {} must be between 1800 and {}".format(method, year, current_year))
else:
if len(str(year)) != len(str(value)):
logger.warning("Collection Warning: {} can be replaced with {}".format(value, year))
final_years.append(year)
except AttributeError:
logger.error("Collection Error: Skipping {} failed to parse year from {}".format(method, value))
return final_years
def logger_input(prompt, timeout=60):
if windows: return windows_input(prompt, timeout)
elif hasattr(signal, "SIGALRM"): return unix_input(prompt, timeout)
else: raise SystemError("Input Timeout not supported on this system")
def alarm_handler(signum, frame):
raise TimeoutExpired
def unix_input(prompt, timeout=60):
prompt = "| {}: ".format(prompt)
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(timeout)
try: return input(prompt)
finally: signal.alarm(0)
def old_windows_input(prompt, timeout=60, timer=time.monotonic):
prompt = "| {}: ".format(prompt)
sys.stdout.write(prompt)
sys.stdout.flush()
endtime = timer() + timeout
result = []
while timer() < endtime:
if msvcrt.kbhit():
result.append(msvcrt.getwche())
if result[-1] == "\n":
out = "".join(result[:-1])
logger.debug("{}{}".format(prompt[2:], out))
return out
time.sleep(0.04)
raise TimeoutExpired
def windows_input(prompt, timeout=5):
sys.stdout.write("| {}: ".format(prompt))
sys.stdout.flush()
result = []
start_time = time.time()
while True:
if msvcrt.kbhit():
chr = msvcrt.getwche()
if ord(chr) == 13: # enter_key
out = "".join(result)
print("")
logger.debug("{}: {}".format(prompt, out))
return out
elif ord(chr) >= 32: #space_char
result.append(chr)
if (time.time() - start_time) > timeout:
print("")
raise TimeoutExpired
def print_multiline(lines, info=False, warning=False, error=False, critical=False):
for i, line in enumerate(lines.split("\n")):
if critical: logger.critical(line)
elif error: logger.error(line)
elif warning: logger.warning(line)
elif info: logger.info(line)
else: logger.debug(line)
if i == 0:
logger.handlers[1].setFormatter(logging.Formatter(" " * 65 + "| %(message)s"))
logger.handlers[1].setFormatter(logging.Formatter("[%(asctime)s] %(filename)-27s %(levelname)-10s | %(message)s"))
def print_stacktrace():
print_multiline(traceback.format_exc())
def my_except_hook(exctype, value, tb):
for line in traceback.format_exception(etype=exctype, value=value, tb=tb):
print_multiline(line, critical=True)
def get_id_from_imdb_url(imdb_url):
match = re.search("(tt\\d+)", str(imdb_url))
if match: return match.group(1)
else: raise Failed("Regex Error: Failed to parse IMDb ID from IMDb URL: {}".format(imdb_url))
def regex_first_int(data, id_type, default=None):
match = re.search("(\\d+)", str(data))
if match:
return int(match.group(1))
elif default:
logger.warning("Regex Warning: Failed to parse {} from {} using {} as default".format(id_type, data, default))
return int(default)
else:
raise Failed("Regex Error: Failed to parse {} from {}".format(id_type, data))
def remove_not(method):
return method[:-4] if method.endswith(".not") else method
def get_centered_text(text):
if len(text) > screen_width - 2:
raise Failed("text must be shorter then screen_width")
space = screen_width - len(text) - 2
if space % 2 == 1:
text += " "
space -= 1
side = int(space / 2)
return "{}{}{}".format(" " * side, text, " " * side)
def seperator(text=None):
logger.handlers[0].setFormatter(logging.Formatter("%(message)-{}s".format(screen_width - 2)))
logger.handlers[1].setFormatter(logging.Formatter("[%(asctime)s] %(filename)-27s %(levelname)-10s %(message)-{}s".format(screen_width - 2)))
logger.info("|{}|".format(seperating_character * screen_width))
if text:
logger.info("| {} |".format(get_centered_text(text)))
logger.info("|{}|".format(seperating_character * screen_width))
logger.handlers[0].setFormatter(logging.Formatter("| %(message)-{}s |".format(screen_width - 2)))
logger.handlers[1].setFormatter(logging.Formatter("[%(asctime)s] %(filename)-27s %(levelname)-10s | %(message)-{}s |".format(screen_width - 2)))
def print_return(length, text):
print(adjust_space(length, "| {}".format(text)), end="\r")
return len(text) + 2
def print_end(length, text=None):
if text: logger.info(adjust_space(length, text))
else: print(adjust_space(length, " "), end="\r")