You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/util.py

421 lines
19 KiB

import logging, os, re, signal, sys, time, traceback
from datetime import datetime
from pathvalidate import is_valid_filename, sanitize_filename
from plexapi.exceptions import BadRequest, NotFound, Unauthorized
try:
import msvcrt
windows = True
except ModuleNotFoundError:
windows = False
logger = logging.getLogger("Plex Meta Manager")
class TimeoutExpired(Exception):
pass
class Failed(Exception):
pass
class ImageData:
def __init__(self, attribute, location, prefix="", is_poster=True, is_url=True):
self.attribute = attribute
self.location = location
self.prefix = prefix
self.is_poster = is_poster
self.is_url = is_url
self.compare = location if is_url else os.stat(location).st_size
self.message = f"{prefix}{'poster' if is_poster else 'background'} to [{'URL' if is_url else 'File'}] {location}"
def retry_if_not_failed(exception):
return not isinstance(exception, Failed)
def retry_if_not_plex(exception):
return not isinstance(exception, (BadRequest, NotFound, Unauthorized))
separating_character = "="
screen_width = 100
spacing = 0
days_alias = {
"monday": 0, "mon": 0, "m": 0,
"tuesday": 1, "tues": 1, "tue": 1, "tu": 1, "t": 1,
"wednesday": 2, "wed": 2, "w": 2,
"thursday": 3, "thurs": 3, "thur": 3, "thu": 3, "th": 3, "r": 3,
"friday": 4, "fri": 4, "f": 4,
"saturday": 5, "sat": 5, "s": 5,
"sunday": 6, "sun": 6, "su": 6, "u": 6
}
pretty_days = {0: "Monday", 1: "Tuesday", 2: "Wednesday", 3: "Thursday", 4: "Friday", 5: "Saturday", 6: "Sunday"}
pretty_months = {
1: "January", 2: "February", 3: "March", 4: "April", 5: "May", 6: "June",
7: "July", 8: "August", 9: "September", 10: "October", 11: "November", 12: "December"
}
pretty_seasons = {"winter": "Winter", "spring": "Spring", "summer": "Summer", "fall": "Fall"}
pretty_names = {
"anidb_id": "AniDB ID", "anidb_relation": "AniDB Relation", "anidb_popular": "AniDB Popular",
"anilist_genre": "AniList Genre", "anilist_id": "AniList ID", "anilist_popular": "AniList Popular",
"anilist_relations": "AniList Relations", "anilist_season": "AniList Season", "anilist_studio": "AniList Studio",
"anilist_tag": "AniList Tag", "anilist_top_rated": "AniList Top Rated",
"icheckmovies_list": "I Check Movies List",
"imdb_list": "IMDb List", "imdb_id": "IMDb ID",
"letterboxd_list": "Letterboxd List", "letterboxd_list_details": "Letterboxd List",
"mal_id": "MyAnimeList ID", "mal_all": "MyAnimeList All", "mal_airing": "MyAnimeList Airing",
"mal_upcoming": "MyAnimeList Upcoming", "mal_tv": "MyAnimeList TV", "mal_ova": "MyAnimeList OVA",
"mal_movie": "MyAnimeList Movie", "mal_special": "MyAnimeList Special", "mal_popular": "MyAnimeList Popular",
"mal_favorite": "MyAnimeList Favorite", "mal_season": "MyAnimeList Season",
"mal_suggested": "MyAnimeList Suggested", "mal_userlist": "MyAnimeList Userlist",
"plex_all": "Plex All", "plex_collection": "Plex Collection", "plex_search": "Plex Search",
"tautulli_popular": "Tautulli Popular", "tautulli_watched": "Tautulli Watched",
"tmdb_actor": "TMDb Actor", "tmdb_actor_details": "TMDb Actor",
"tmdb_collection": "TMDb Collection", "tmdb_collection_details": "TMDb Collection",
"tmdb_company": "TMDb Company", "tmdb_crew": "TMDb Crew", "tmdb_crew_details": "TMDb Crew",
"tmdb_director": "TMDb Director", "tmdb_director_details": "TMDb Director", "tmdb_discover": "TMDb Discover",
"tmdb_keyword": "TMDb Keyword", "tmdb_list": "TMDb List", "tmdb_list_details": "TMDb List",
"tmdb_movie": "TMDb Movie", "tmdb_movie_details": "TMDb Movie", "tmdb_network": "TMDb Network",
"tmdb_now_playing": "TMDb Now Playing", "tmdb_person": "TMDb Person", "tmdb_popular": "TMDb Popular",
"tmdb_producer": "TMDb Producer", "tmdb_producer_details": "TMDb Producer",
"tmdb_show": "TMDb Show", "tmdb_show_details": "TMDb Show", "tmdb_top_rated": "TMDb Top Rated",
"tmdb_trending_daily": "TMDb Trending Daily", "tmdb_trending_weekly": "TMDb Trending Weekly",
"tmdb_writer": "TMDb Writer", "tmdb_writer_details": "TMDb Writer",
"trakt_collected": "Trakt Collected", "trakt_collection": "Trakt Collection",
"trakt_list": "Trakt List", "trakt_list_details": "Trakt List",
"trakt_popular": "Trakt Popular", "trakt_recommended": "Trakt Recommended", "trakt_trending": "Trakt Trending",
"trakt_watched": "Trakt Watched", "trakt_watchlist": "Trakt Watchlist",
"tvdb_list": "TVDb List", "tvdb_list_details": "TVDb List",
"tvdb_movie": "TVDb Movie", "tvdb_movie_details": "TVDb Movie",
"tvdb_show": "TVDb Show", "tvdb_show_details": "TVDb Show"
}
pretty_ids = {"anidbid": "AniDB", "imdbid": "IMDb", "mal_id": "MyAnimeList", "themoviedb_id": "TMDb", "thetvdb_id": "TVDb", "tvdbid": "TVDb"}
def tab_new_lines(data):
return str(data).replace("\n", "\n|\t ") if "\n" in str(data) else str(data)
def make_ordinal(n):
n = int(n)
suffix = ["th", "st", "nd", "rd", "th"][min(n % 10, 4)]
if 11 <= (n % 100) <= 13:
suffix = "th"
return str(n) + suffix
def choose_from_list(datalist, description, data=None, list_type="title", exact=False):
if len(datalist) > 0:
if len(datalist) == 1 and (description != "collection" or datalist[0].title == data):
return datalist[0]
zero_option = f"Create New Collection: {data}" if description == "collection" else "Do Nothing"
message = f"Multiple {description}s Found\n0) {zero_option}"
for i, d in enumerate(datalist, 1):
if list_type == "title":
if d.title == data:
return d
message += f"\n{i}) {d.title}"
else:
message += f"\n{i}) [{d[0]}] {d[1]}"
if exact:
return None
print_multiline(message, info=True)
while True:
try:
selection = int(logger_input(f"Choose {description} number")) - 1
if selection >= 0: return datalist[selection]
elif selection == -1: return None
else: logger.info(f"Invalid {description} number")
except IndexError: logger.info(f"Invalid {description} number")
except TimeoutExpired:
if list_type == "title":
logger.warning(f"Input Timeout: using {data}")
return None
else:
logger.warning(f"Input Timeout: using {datalist[0][1]}")
return datalist[0]
else:
return None
def get_bool(method_name, method_data):
if isinstance(method_data, bool):
return method_data
elif str(method_data).lower() in ["t", "true"]:
return True
elif str(method_data).lower() in ["f", "false"]:
return False
else:
raise Failed(f"Collection Error: {method_name} attribute: {method_data} invalid must be either true or false")
def compile_list(data):
if isinstance(data, list):
text = ""
for item in data:
text += f"{',' if len(text) > 0 else ''}{item}"
return text
else:
return data
def get_list(data, lower=False, split=True, int_list=False):
if data is None: return None
elif isinstance(data, list): return data
elif isinstance(data, dict): return [data]
elif split is False: return [str(data)]
elif lower is True: return [d.strip().lower() for d in str(data).split(",")]
elif int_list is True: return [int(d.strip()) for d in str(data).split(",")]
else: return [d.strip() for d in str(data).split(",")]
def get_int_list(data, id_type):
values = get_list(data)
int_values = []
for value in values:
try: int_values.append(regex_first_int(value, id_type))
except Failed as e: logger.error(e)
return int_values
def get_year_list(data, current_year, method):
final_years = []
values = get_list(data)
for value in values:
final_years.append(check_year(value, current_year, method))
return final_years
def check_year(year, current_year, method):
return check_number(year, method, minimum=1800, maximum=current_year)
def check_number(value, method, number_type="int", minimum=None, maximum=None):
if number_type == "int":
try: num_value = int(str(value))
except ValueError: raise Failed(f"Collection Error: {method}: {value} must be an integer")
elif number_type == "float":
try: num_value = float(str(value))
except ValueError: raise Failed(f"Collection Error: {method}: {value} must be a number")
else: raise Failed(f"Number Type: {number_type} invalid")
if minimum is not None and maximum is not None and (num_value < minimum or num_value > maximum):
raise Failed(f"Collection Error: {method}: {num_value} must be between {minimum} and {maximum}")
elif minimum is not None and num_value < minimum:
raise Failed(f"Collection Error: {method}: {num_value} is less then {minimum}")
elif maximum is not None and num_value > maximum:
raise Failed(f"Collection Error: {method}: {num_value} is greater then {maximum}")
else:
return num_value
def validate_date(date_text, method, return_as=None):
try: date_obg = datetime.strptime(str(date_text), "%Y-%m-%d" if "-" in str(date_text) else "%m/%d/%Y")
except ValueError: raise Failed(f"Collection Error: {method}: {date_text} must match pattern YYYY-MM-DD (e.g. 2020-12-25) or MM/DD/YYYY (e.g. 12/25/2020)")
return datetime.strftime(date_obg, return_as) if return_as else date_obg
def logger_input(prompt, timeout=60):
if windows: return windows_input(prompt, timeout)
elif hasattr(signal, "SIGALRM"): return unix_input(prompt, timeout)
else: raise SystemError("Input Timeout not supported on this system")
def header(language="en-US,en;q=0.5"):
return {"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}
def alarm_handler(signum, frame):
raise TimeoutExpired
def unix_input(prompt, timeout=60):
prompt = f"| {prompt}: "
signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(timeout)
try: return input(prompt)
except EOFError: raise Failed("Input Failed")
finally: signal.alarm(0)
def old_windows_input(prompt, timeout=60, timer=time.monotonic):
prompt = f"| {prompt}: "
sys.stdout.write(prompt)
sys.stdout.flush()
endtime = timer() + timeout
result = []
while timer() < endtime:
if msvcrt.kbhit():
result.append(msvcrt.getwche())
if result[-1] == "\n":
out = "".join(result[:-1])
logger.debug(f"{prompt[2:]}{out}")
return out
time.sleep(0.04)
raise TimeoutExpired
def windows_input(prompt, timeout=5):
sys.stdout.write(f"| {prompt}: ")
sys.stdout.flush()
result = []
start_time = time.time()
while True:
if msvcrt.kbhit():
char = msvcrt.getwche()
if ord(char) == 13: # enter_key
out = "".join(result)
print("")
logger.debug(f"{prompt}: {out}")
return out
elif ord(char) >= 32: #space_char
result.append(char)
if (time.time() - start_time) > timeout:
print("")
raise TimeoutExpired
def print_multiline(lines, info=False, warning=False, error=False, critical=False):
for i, line in enumerate(str(lines).split("\n")):
if critical: logger.critical(line)
elif error: logger.error(line)
elif warning: logger.warning(line)
elif info: logger.info(line)
else: logger.debug(line)
if i == 0:
logger.handlers[1].setFormatter(logging.Formatter(" " * 65 + "| %(message)s"))
logger.handlers[1].setFormatter(logging.Formatter("[%(asctime)s] %(filename)-27s %(levelname)-10s | %(message)s"))
def print_stacktrace():
print_multiline(traceback.format_exc())
def my_except_hook(exctype, value, tb):
for line in traceback.format_exception(etype=exctype, value=value, tb=tb):
print_multiline(line, critical=True)
def get_id_from_imdb_url(imdb_url):
match = re.search("(tt\\d+)", str(imdb_url))
if match: return match.group(1)
else: raise Failed(f"Regex Error: Failed to parse IMDb ID from IMDb URL: {imdb_url}")
def regex_first_int(data, id_type, default=None):
match = re.search("(\\d+)", str(data))
if match:
return int(match.group(1))
elif default:
logger.warning(f"Regex Warning: Failed to parse {id_type} from {data} using {default} as default")
return int(default)
else:
raise Failed(f"Regex Error: Failed to parse {id_type} from {data}")
def centered(text, sep=" "):
if len(text) > screen_width - 2:
return text
space = screen_width - len(text) - 2
text = f" {text} "
if space % 2 == 1:
text += sep
space -= 1
side = int(space / 2) - 1
final_text = f"{sep * side}{text}{sep * side}"
return final_text
def separator(text=None, space=True, border=True, debug=False):
sep = " " if space else separating_character
for handler in logger.handlers:
apply_formatter(handler, border=False)
border_text = f"|{separating_character * screen_width}|"
if border and debug:
logger.debug(border_text)
elif border:
logger.info(border_text)
if text:
text_list = text.split("\n")
for t in text_list:
logger.info(f"|{sep}{centered(t, sep=sep)}{sep}|")
if border and debug:
logger.debug(border_text)
elif border:
logger.info(border_text)
for handler in logger.handlers:
apply_formatter(handler)
def apply_formatter(handler, border=True):
text = f"| %(message)-{screen_width - 2}s |" if border else f"%(message)-{screen_width - 2}s"
if isinstance(handler, logging.handlers.RotatingFileHandler):
text = f"[%(asctime)s] %(filename)-27s %(levelname)-10s {text}"
handler.setFormatter(logging.Formatter(text))
def adjust_space(display_title):
display_title = str(display_title)
space_length = spacing - len(display_title)
if space_length > 0:
display_title += " " * space_length
return display_title
def print_return(text):
print(adjust_space(f"| {text}"), end="\r")
global spacing
spacing = len(text) + 2
def print_end():
print(adjust_space(" "), end="\r")
global spacing
spacing = 0
def validate_filename(filename):
if is_valid_filename(filename):
return filename, None
else:
mapping_name = sanitize_filename(filename)
return mapping_name, f"Log Folder Name: {filename} is invalid using {mapping_name}"
def is_locked(filepath):
locked = None
file_object = None
if os.path.exists(filepath):
try:
file_object = open(filepath, 'a', 8)
if file_object:
locked = False
except IOError as message:
locked = True
finally:
if file_object:
file_object.close()
return locked
def validate_dict_list(method_name, data):
final_list = []
for dict_data in get_list(data):
if isinstance(dict_data, dict):
final_list.append((dict_data, {dm.lower(): dm for dm in dict_data}))
else:
raise Failed(f"Collection Error: {method_name} attribute is not a dictionary: {dict_data}")
return final_list
def parse_int(method, data, default=10, minimum=1, maximum=None):
list_count = regex_first_int(data, "List Size", default=default)
if maximum is None and list_count < minimum:
logger.warning(f"Collection Warning: {method} must an integer >= {minimum} using {default} as default")
elif maximum is not None and (list_count < minimum or list_count > maximum):
logger.warning(f"Collection Warning: {method} must an integer between {minimum} and {maximum} using {default} as default")
else:
return list_count
return default
def parse_from_dict(parent, method, data, methods, default=None, options=None, translation=None):
message = ""
if options is None and translation is not None:
options = [o for o in translation]
if method not in methods:
message = f"{parent} {method} attribute not found"
elif data[methods[method]] is None:
message = f"{parent} {method} attribute is blank"
elif (translation is not None and str(data[methods[method]]).lower() not in translation) or \
(options is not None and translation is None and str(data[methods[method]]).lower() not in options):
message = f"{parent} {method} attribute {data[methods[method]]} must be in {options}"
else:
return translation[data[methods[method]]] if translation is not None else data[methods[method]]
if default is None:
raise Failed(f"Collection Error: {message}")
else:
logger.warning(f"Collection Warning: {message} using {default} as default")
return translation[default] if translation is not None else default
def parse_int_from_dict(parent, method, data, methods, default, minimum=1, maximum=None):
if method not in methods:
logger.warning(f"Collection Warning: {parent} {method} attribute not found using {default} as default")
elif not data[methods[method]]:
logger.warning(f"Collection Warning: {parent} {methods[method]} attribute is blank using {default} as default")
elif maximum is None and (not isinstance(data[methods[method]], int) or data[methods[method]] < minimum):
logger.warning(f"Collection Warning: {parent} {methods[method]} attribute {data[methods[method]]} must an integer >= {minimum} using {default} as default")
elif maximum is not None and (not isinstance(data[methods[method]], int) or data[methods[method]] < minimum or data[methods[method]] > maximum):
logger.warning(f"Collection Warning: {parent} {methods[method]} attribute {data[methods[method]]} must an integer between {minimum} and {maximum} using {default} as default")
else:
return data[methods[method]]
return default
def parse_list(method, data, methods):
if method in methods and data[methods[method]]:
return [i for i in data[methods[method]] if i] if isinstance(data[methods[method]], list) else [str(data[methods[method]])]
return []