You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/overlays.py

424 lines
24 KiB

import os, re, time
from datetime import datetime
from modules import plex, util
from modules.builder import CollectionBuilder
from modules.util import Failed, NotScheduled
from plexapi.audio import Album
from plexapi.exceptions import BadRequest
from plexapi.video import Movie, Show, Season, Episode
from PIL import Image, ImageFilter
logger = util.logger
special_text_overlays = [f"{a}{s}" for a in ["audience_rating", "critic_rating", "user_rating"] for s in ["", "%", "#"]]
class Overlays:
def __init__(self, config, library):
self.config = config
self.library = library
self.overlays = []
def run_overlays(self):
overlay_start = datetime.now()
logger.info("")
logger.separator(f"{self.library.name} Library Overlays")
logger.info("")
os.makedirs(self.library.overlay_backup, exist_ok=True)
old_overlays = [la for la in self.library.Plex.listFilterChoices("label") if str(la.title).lower().endswith(" overlay")]
if old_overlays:
logger.info("")
logger.separator(f"Removing Old Overlays for the {self.library.name} Library")
logger.info("")
for old_overlay in old_overlays:
label_items = self.get_overlay_items(label=old_overlay)
if label_items:
logger.info("")
logger.separator(f"Removing {old_overlay.title}")
logger.info("")
for i, item in enumerate(label_items, 1):
item_title = self.get_item_sort_title(item, atr="title")
logger.ghost(f"Restoring {old_overlay.title}: {i}/{len(label_items)} {item_title}")
self.remove_overlay(item, item_title, old_overlay.title, [
os.path.join(self.library.overlay_folder, old_overlay.title[:-8], f"{item.ratingKey}.png")
])
key_to_overlays = {}
queues = {}
properties = None
if not self.library.remove_overlays:
key_to_overlays, properties, queues = self.compile_overlays()
ignore_list = [rk for rk in key_to_overlays]
remove_overlays = self.get_overlay_items(ignore=ignore_list)
if self.library.is_show:
remove_overlays.extend(self.get_overlay_items(libtype="episode", ignore=ignore_list))
remove_overlays.extend(self.get_overlay_items(libtype="season", ignore=ignore_list))
elif self.library.is_music:
remove_overlays.extend(self.get_overlay_items(libtype="album", ignore=ignore_list))
logger.info("")
if remove_overlays:
logger.separator(f"Removing Overlays for the {self.library.name} Library")
for i, item in enumerate(remove_overlays, 1):
item_title = self.get_item_sort_title(item, atr="title")
logger.ghost(f"Restoring: {i}/{len(remove_overlays)} {item_title}")
self.remove_overlay(item, item_title, "Overlay", [
os.path.join(self.library.overlay_backup, f"{item.ratingKey}.png"),
os.path.join(self.library.overlay_backup, f"{item.ratingKey}.jpg")
])
logger.exorcise()
else:
logger.separator(f"No Overlays to Remove for the {self.library.name} Library")
logger.info("")
if not self.library.remove_overlays:
logger.info("")
logger.separator(f"Applying Overlays for the {self.library.name} Library")
logger.info("")
for i, (over_key, (item, over_names)) in enumerate(sorted(key_to_overlays.items(), key=lambda io: self.get_item_sort_title(io[1][0])), 1):
try:
item_title = self.get_item_sort_title(item, atr="title")
logger.ghost(f"Overlaying: {i}/{len(key_to_overlays)} {item_title}")
image_compare = None
overlay_compare = None
poster = None
if self.config.Cache:
image, image_compare, overlay_compare = self.config.Cache.query_image_map(item.ratingKey, f"{self.library.image_table_name}_overlays")
overlay_compare = [] if overlay_compare is None else util.get_list(overlay_compare, split="|")
has_overlay = any([item_tag.tag.lower() == "overlay" for item_tag in self.library.item_labels(item)])
compare_names = {properties[ov].get_overlay_compare(): ov for ov in over_names}
blur_num = 0
applied_names = []
queue_overlays = {}
for over_name in over_names:
if over_name.startswith("blur"):
blur_test = int(re.search("\\(([^)]+)\\)", over_name).group(1))
if blur_test > blur_num:
blur_num = blur_test
else:
applied_names.append(over_name)
for over_name in applied_names:
overlay = properties[over_name]
if overlay.queue:
if overlay.queue not in queue_overlays:
queue_overlays[overlay.queue] = {}
queue_overlays[overlay.queue][overlay.weight] = over_name
overlay_change = False if has_overlay else True
if not overlay_change:
for oc in overlay_compare:
if oc not in compare_names:
overlay_change = True
if not overlay_change:
for compare_name, original_name in compare_names.items():
if compare_name not in overlay_compare or properties[original_name].updated:
overlay_change = True
if self.config.Cache:
for over_name in over_names:
if over_name in special_text_overlays:
rating_type = over_name[5:-1]
if rating_type.endswith(("%", "#")):
rating_type = rating_type[:-1]
cache_rating = self.config.Cache.query_overlay_ratings(item.ratingKey, rating_type)
actual = plex.attribute_translation[rating_type]
if not hasattr(item, actual) or getattr(item, actual) is None:
continue
if getattr(item, actual) != cache_rating:
overlay_change = True
try:
poster, _, item_dir, _ = self.library.find_item_assets(item)
if not poster and self.library.assets_for_all and self.library.show_missing_assets:
logger.warning(f"Asset Warning: No poster found in the assets folder '{item_dir}'")
except Failed as e:
if self.library.assets_for_all and self.library.show_missing_assets:
logger.warning(e)
has_original = None
changed_image = False
new_backup = None
if poster:
if image_compare and str(poster.compare) != str(image_compare):
changed_image = True
elif has_overlay:
if os.path.exists(os.path.join(self.library.overlay_backup, f"{item.ratingKey}.png")):
has_original = os.path.join(self.library.overlay_backup, f"{item.ratingKey}.png")
elif os.path.exists(os.path.join(self.library.overlay_backup, f"{item.ratingKey}.jpg")):
has_original = os.path.join(self.library.overlay_backup, f"{item.ratingKey}.jpg")
else:
new_backup = self.find_poster_url(item)
if new_backup is None:
new_backup = item.posterUrl
else:
new_backup = item.posterUrl
if new_backup:
changed_image = True
image_response = self.config.get(new_backup)
if image_response.status_code >= 400:
raise Failed(f"{item_title[:60]:<60} | Overlay Error: Poster Download Failed")
i_ext = "jpg" if image_response.headers["Content-Type"] == "image/jpeg" else "png"
backup_image_path = os.path.join(self.library.overlay_backup, f"{item.ratingKey}.{i_ext}")
with open(backup_image_path, "wb") as handler:
handler.write(image_response.content)
while util.is_locked(backup_image_path):
time.sleep(1)
has_original = backup_image_path
poster_compare = None
if poster is None and has_original is None:
logger.error(f"{item_title[:60]:<60} | Overlay Error: No poster found")
elif changed_image or overlay_change:
try:
canvas_width = 1920 if isinstance(item, Episode) else 1000
canvas_height = 1080 if isinstance(item, Episode) else 1500
new_poster = Image.open(poster.location if poster else has_original) \
.convert("RGB").resize((canvas_width, canvas_height), Image.ANTIALIAS)
if blur_num > 0:
new_poster = new_poster.filter(ImageFilter.GaussianBlur(blur_num))
for over_name in applied_names:
overlay = properties[over_name]
if over_name.startswith("text"):
text = over_name[5:-1]
if text in special_text_overlays:
per = text.endswith("%")
flat = text.endswith("#")
rating_type = text[:-1] if per or flat else text
actual = plex.attribute_translation[rating_type]
if not hasattr(item, actual) or getattr(item, actual) is None:
logger.warning(f"Overlay Warning: No {rating_type} found")
continue
text = getattr(item, actual)
if self.config.Cache:
self.config.Cache.update_overlay_ratings(item.ratingKey, rating_type, text)
if per:
text = f"{int(text * 10)}%"
if flat and str(text).endswith(".0"):
text = str(text)[:-2]
overlay_image, _ = overlay.get_backdrop((canvas_width, canvas_height), text=str(text))
else:
overlay_image = overlay.landscape if isinstance(item, Episode) else overlay.portrait
new_poster.paste(overlay_image, (0, 0), overlay_image)
else:
if overlay.has_coordinates():
if overlay.portrait is not None:
overlay_image = overlay.landscape if isinstance(item, Episode) else overlay.portrait
new_poster.paste(overlay_image, (0, 0), overlay_image)
overlay_box = overlay.landscape_box if isinstance(item, Episode) else overlay.portrait_box
new_poster.paste(overlay.image, overlay_box, overlay.image)
else:
new_poster = new_poster.resize(overlay.image.size, Image.ANTIALIAS)
new_poster.paste(overlay.image, (0, 0), overlay.image)
for queue, weights in queue_overlays.items():
if queue not in queues:
logger.error(f"Overlay Error: no queue {queue} found")
continue
cords = queues[queue]
sorted_weights = sorted(weights.items(), reverse=True)
for o, cord in enumerate(cords):
if len(sorted_weights) <= o:
break
over_name = sorted_weights[o][1]
overlay = properties[over_name]
if over_name.startswith("text"):
text = over_name[5:-1]
if text in special_text_overlays:
per = text.endswith("%")
flat = text.endswith("#")
rating_type = text[:-1] if per or flat else text
actual = plex.attribute_translation[rating_type]
if not hasattr(item, actual) or getattr(item, actual) is None:
logger.warning(f"Overlay Warning: No {rating_type} found")
continue
text = getattr(item, actual)
if self.config.Cache:
self.config.Cache.update_overlay_ratings(item.ratingKey, rating_type, text)
if per:
text = f"{int(text * 10)}%"
if flat and str(text).endswith(".0"):
text = str(text)[:-2]
overlay_image, _ = overlay.get_backdrop((canvas_width, canvas_height), text=str(text), new_cords=cord)
new_poster.paste(overlay_image, (0, 0), overlay_image)
else:
if overlay.has_back:
overlay_image, overlay_box = overlay.get_backdrop((canvas_width, canvas_height), box=overlay.image.size, new_cords=cord)
new_poster.paste(overlay_image, (0, 0), overlay_image)
else:
overlay_box = (cord[1], cord[3])
new_poster.paste(overlay.image, overlay_box, overlay.image)
temp = os.path.join(self.library.overlay_folder, f"temp.png")
new_poster.save(temp, "PNG")
self.library.upload_poster(item, temp)
self.library.edit_tags("label", item, add_tags=["Overlay"], do_print=False)
self.library.reload(item, force=True)
poster_compare = poster.compare if poster else item.thumb
logger.info(f"{item_title[:60]:<60} | Overlays Applied: {', '.join(over_names)}")
except (OSError, BadRequest) as e:
logger.stacktrace()
raise Failed(f"{item_title[:60]:<60} | Overlay Error: {e}")
elif self.library.show_asset_not_needed:
logger.info(f"{item_title[:60]:<60} | Overlay Update Not Needed")
if self.config.Cache and poster_compare:
self.config.Cache.update_image_map(item.ratingKey, f"{self.library.image_table_name}_overlays",
item.thumb, poster_compare, overlay='|'.join(compare_names))
except Failed as e:
logger.error(e)
logger.exorcise()
overlay_run_time = str(datetime.now() - overlay_start).split('.')[0]
logger.info("")
logger.separator(f"Finished {self.library.name} Library Overlays\nOverlays Run Time: {overlay_run_time}")
return overlay_run_time
def get_item_sort_title(self, item_to_sort, atr="titleSort"):
if isinstance(item_to_sort, Album):
return f"{getattr(item_to_sort.artist(), atr)} Album {getattr(item_to_sort, atr)}"
elif isinstance(item_to_sort, Season):
return f"{getattr(item_to_sort.show(), atr)} Season {item_to_sort.seasonNumber}"
elif isinstance(item_to_sort, Episode):
return f"{getattr(item_to_sort.show(), atr)} {item_to_sort.seasonEpisode.upper()}"
else:
return getattr(item_to_sort, atr)
def compile_overlays(self):
key_to_item = {}
properties = {}
overlay_groups = {}
key_to_overlays = {}
queues = {}
for overlay_file in self.library.overlay_files:
for k, v in overlay_file.queues.items():
if not isinstance(v, list):
raise Failed(f"Overlay Error: Queue: {k} must be a list")
try:
queues[k] = [util.parse_cords(q, f"{k} queue", required=True) for q in v]
except Failed as e:
logger.error(e)
for k, v in overlay_file.overlays.items():
try:
builder = CollectionBuilder(self.config, overlay_file, k, v, library=self.library, overlay=True)
logger.info("")
logger.separator(f"Gathering Items for {k} Overlay", space=False, border=False)
if builder.overlay.name not in properties:
properties[builder.overlay.name] = builder.overlay
for method, value in builder.builders:
logger.debug("")
logger.debug(f"Builder: {method}: {value}")
logger.info("")
builder.filter_and_save_items(builder.gather_ids(method, value))
if builder.filters or builder.tmdb_filters:
logger.info("")
for filter_key, filter_value in builder.filters:
logger.info(f"Collection Filter {filter_key}: {filter_value}")
for filter_key, filter_value in builder.tmdb_filters:
logger.info(f"Collection Filter {filter_key}: {filter_value}")
added_titles = []
if builder.added_items:
for item in builder.added_items:
key_to_item[item.ratingKey] = item
added_titles.append(item)
if item.ratingKey not in properties[builder.overlay.name].keys:
properties[builder.overlay.name].keys.append(item.ratingKey)
if added_titles:
logger.debug(f"{len(added_titles)} Titles Found: {[self.get_item_sort_title(a, atr='title') for a in added_titles]}")
logger.info(f"{len(added_titles) if added_titles else 'No'} Items found for {builder.overlay.name}")
except NotScheduled as e:
logger.info(e)
except Failed as e:
logger.stacktrace()
logger.error(e)
for overlay_name, over_obj in properties.items():
if over_obj.group:
if over_obj.group not in overlay_groups:
overlay_groups[over_obj.group] = {}
overlay_groups[over_obj.group][overlay_name] = over_obj.weight
for rk in over_obj.keys:
for suppress_name in over_obj.suppress:
if suppress_name in properties and rk in properties[suppress_name].keys:
properties[suppress_name].keys.remove(rk)
for overlay_name, over_obj in properties.items():
for over_key in over_obj.keys:
if over_key not in key_to_overlays:
key_to_overlays[over_key] = (key_to_item[over_key], [])
key_to_overlays[over_key][1].append(overlay_name)
for over_key, (item, over_names) in key_to_overlays.items():
group_status = {}
for over_name in over_names:
for overlay_group, group_names in overlay_groups.items():
if over_name in group_names:
if overlay_group not in group_status:
group_status[overlay_group] = []
group_status[overlay_group].append(over_name)
for gk, gv in group_status.items():
if len(gv) > 1:
final = None
for v in gv:
if final is None or overlay_groups[gk][v] > overlay_groups[gk][final]:
final = v
for v in gv:
if final != v:
key_to_overlays[over_key][1].remove(v)
return key_to_overlays, properties, queues
def find_poster_url(self, item):
try:
if isinstance(item, Movie):
if item.ratingKey in self.library.movie_rating_key_map:
return self.config.TMDb.get_movie(self.library.movie_rating_key_map[item.ratingKey]).poster_url
elif isinstance(item, (Show, Season, Episode)):
check_key = item.ratingKey if isinstance(item, Show) else item.show().ratingKey
if check_key in self.library.show_rating_key_map:
tmdb_id = self.config.Convert.tvdb_to_tmdb(self.library.show_rating_key_map[check_key])
if isinstance(item, Show) and item.ratingKey in self.library.show_rating_key_map:
return self.config.TMDb.get_show(tmdb_id).poster_url
elif isinstance(item, Season):
return self.config.TMDb.get_season(tmdb_id, item.seasonNumber).poster_url
elif isinstance(item, Episode):
return self.config.TMDb.get_episode(tmdb_id, item.seasonNumber, item.episodeNumber).still_url
except Failed as e:
logger.error(e)
def get_overlay_items(self, label="Overlay", libtype=None, ignore=None):
items = self.library.search(label=label, libtype=libtype)
return items if not ignore else [o for o in items if o.ratingKey not in ignore]
def remove_overlay(self, item, item_title, label, locations):
try:
poster, _, _, _ = self.library.find_item_assets(item)
except Failed:
poster = None
is_url = False
original = None
if poster:
poster_location = poster.location
elif any([os.path.exists(loc) for loc in locations]):
original = next((loc for loc in locations if os.path.exists(loc)))
poster_location = original
else:
is_url = True
poster_location = self.find_poster_url(item)
if poster_location:
self.library.upload_poster(item, poster_location, url=is_url)
self.library.edit_tags("label", item, remove_tags=[label], do_print=False)
if original:
os.remove(original)
else:
logger.error(f"No Poster found to restore for {item_title}")