[57] update overlay removal/reset process

pull/1304/head
meisnate12 2 years ago
parent db8fbeac3e
commit e8cc6a6920

@ -1 +1 @@
1.18.3-develop56 1.18.3-develop57

@ -13,10 +13,6 @@ external_templates:
horizontal_align: right horizontal_align: right
vertical_align: bottom vertical_align: bottom
conditionals: conditionals:
overlay_level:
conditions:
- library_type: show
value: episode
vertical_offset: vertical_offset:
default: 15 default: 15
conditions: conditions:

@ -1,9 +1,10 @@
import os import os, time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from modules import util, operations from modules import util, operations
from modules.meta import MetadataFile, OverlayFile from modules.meta import MetadataFile, OverlayFile
from modules.operations import Operations from modules.operations import Operations
from modules.util import Failed, NotScheduled, YAML from modules.util import Failed, NotScheduled, YAML
from PIL import Image, ImageFilter
logger = util.logger logger = util.logger
@ -249,6 +250,40 @@ class Library(ABC):
def item_labels(self, item): def item_labels(self, item):
pass pass
@abstractmethod
def find_poster_url(self, item):
pass
def check_image_for_overlay(self, image_url, image_path, remove=False):
image_response = self.config.get(image_url)
if image_response.status_code >= 400:
raise Failed("Image Download Failed")
if image_response.headers["Content-Type"] not in ["image/png", "image/jpeg", "image/webp"]:
raise Failed("Image Not PNG, JPG, or WEBP")
if image_response.headers["Content-Type"] == "image/jpeg":
image_path += ".jpg"
elif image_response.headers["Content-Type"] == "image/webp":
image_path += ".webp"
else:
image_path += ".png"
with open(image_path, "wb") as handler:
handler.write(image_response.content)
while util.is_locked(image_path):
time.sleep(1)
with Image.open(image_path) as image:
exif_tags = image.getexif()
if 0x04bc in exif_tags and exif_tags[0x04bc] == "overlay":
os.remove(image_path)
raise Failed("Poster already has an Overlay")
if remove:
os.remove(image_path)
else:
return image_path
@abstractmethod
def item_posters(self, item):
pass
@abstractmethod @abstractmethod
def get_all(self, builder_level=None, load=False): def get_all(self, builder_level=None, load=False):
pass pass

@ -1,4 +1,4 @@
import os, re, time import os, re
from datetime import datetime from datetime import datetime
from modules import plex, util, overlay from modules import plex, util, overlay
from modules.builder import CollectionBuilder from modules.builder import CollectionBuilder
@ -170,53 +170,19 @@ class Overlays:
reset_list = ["plex", "tmdb"] reset_list = ["plex", "tmdb"]
else: else:
reset_list = [] reset_list = []
reset_attempted = False try:
for reset in reset_list: new_backup = self.library.item_posters(item)
if reset == "plex": except Failed as e:
reset_attempted = True if any(r in reset_list for r in ["plex", "tmdb"]):
temp_poster = next((p for p in item.posters()), None) logger.error(e)
if temp_poster:
new_backup = f"{self.library.url}{temp_poster.key}&X-Plex-Token={self.library.token}"
break
else:
logger.trace("Plex Error: Plex Poster Download Failed")
if reset == "tmdb":
reset_attempted = True
try:
new_backup = self.find_poster_url(item)
break
except Failed as e:
logger.trace(e)
if reset_attempted and not new_backup:
logger.error("Overlay Error: Reset Failed")
else: else:
new_backup = item.posterUrl new_backup = item.posterUrl
if new_backup: if new_backup:
changed_image = True changed_image = True
image_response = self.config.get(new_backup) try:
if image_response.status_code >= 400: has_original = self.library.check_image_for_overlay(new_backup, os.path.join(self.library.overlay_backup, f"{item.ratingKey}"))
raise Failed(f"{item_title[:60]:<60} | Overlay Error: Image Download Failed") except Failed as e:
if image_response.headers["Content-Type"] not in ["image/png", "image/jpeg", "image/webp"]: raise Failed(f"{item_title[:60]:<60} | Overlay Error: {e}")
raise Failed(f"{item_title[:60]:<60} | Overlay Error: Image Not PNG, JPG, or WEBP")
if image_response.headers["Content-Type"] == "image/jpeg":
i_ext = "jpg"
elif image_response.headers["Content-Type"] == "image/webp":
i_ext = "webp"
else:
i_ext = "png"
backup_image_path = os.path.join(self.library.overlay_backup, f"{item.ratingKey}.{i_ext}")
with open(backup_image_path, "wb") as handler:
handler.write(image_response.content)
while util.is_locked(backup_image_path):
time.sleep(1)
backup_poster = Image.open(backup_image_path)
exif_tags = backup_poster.getexif()
if 0x04bc in exif_tags and exif_tags[0x04bc] == "overlay":
logger.error(f"{item_title[:60]:<60} | Overlay Backup Error: Poster already has an Overlay")
os.remove(backup_image_path)
else:
has_original = backup_image_path
poster_compare = None poster_compare = None
if poster is None and has_original is None: if poster is None and has_original is None:
logger.error(f"{item_title[:60]:<60} | Overlay Error: No poster found") logger.error(f"{item_title[:60]:<60} | Overlay Error: No poster found")
@ -387,6 +353,8 @@ class Overlays:
except Failed as e: except Failed as e:
logger.error(f"{e}\nOverlays Attempted on {item_title}: {', '.join(over_names)}") logger.error(f"{e}\nOverlays Attempted on {item_title}: {', '.join(over_names)}")
except Exception as e: except Exception as e:
logger.info(e)
logger.info(type(e))
logger.stacktrace(e) logger.stacktrace(e)
logger.error("") logger.error("")
logger.error(f"Overlays Attempted on {item_title}: {', '.join(over_names)}") logger.error(f"Overlays Attempted on {item_title}: {', '.join(over_names)}")
@ -509,21 +477,6 @@ class Overlays:
key_to_overlays[over_key][1].remove(v) key_to_overlays[over_key][1].remove(v)
return key_to_overlays, properties return key_to_overlays, properties
def find_poster_url(self, item):
if isinstance(item, Movie):
if item.ratingKey in self.library.movie_rating_key_map:
return self.config.TMDb.get_movie(self.library.movie_rating_key_map[item.ratingKey]).poster_url
elif isinstance(item, (Show, Season, Episode)):
check_key = item.ratingKey if isinstance(item, Show) else item.show().ratingKey
if check_key in self.library.show_rating_key_map:
tmdb_id = self.config.Convert.tvdb_to_tmdb(self.library.show_rating_key_map[check_key])
if isinstance(item, Show) and item.ratingKey in self.library.show_rating_key_map:
return self.config.TMDb.get_show(tmdb_id).poster_url
elif isinstance(item, Season):
return self.config.TMDb.get_season(tmdb_id, item.seasonNumber).poster_url
elif isinstance(item, Episode):
return self.config.TMDb.get_episode(tmdb_id, item.seasonNumber, item.episodeNumber).still_url
def get_overlay_items(self, label="Overlay", libtype=None, ignore=None): def get_overlay_items(self, label="Overlay", libtype=None, ignore=None):
items = self.library.search(label=label, libtype=libtype) items = self.library.search(label=label, libtype=libtype)
return items if not ignore else [o for o in items if o.ratingKey not in ignore] return items if not ignore else [o for o in items if o.ratingKey not in ignore]
@ -542,9 +495,9 @@ class Overlays:
if not poster_location: if not poster_location:
is_url = True is_url = True
try: try:
poster_location = self.find_poster_url(item) poster_location = self.library.item_posters(item)
except Failed as e: except Failed:
logger.error(e) pass
if poster_location: if poster_location:
self.library.upload_poster(item, poster_location, url=is_url) self.library.upload_poster(item, poster_location, url=is_url)
self.library.edit_tags("label", item, remove_tags=[label], do_print=False) self.library.edit_tags("label", item, remove_tags=[label], do_print=False)

@ -595,6 +595,49 @@ class Plex(Library):
except BadRequest: except BadRequest:
raise Failed(f"Item: {item.title} Labels failed to load") raise Failed(f"Item: {item.title} Labels failed to load")
def find_poster_url(self, item):
if isinstance(item, Movie):
if item.ratingKey in self.movie_rating_key_map:
return self.config.TMDb.get_movie(self.movie_rating_key_map[item.ratingKey]).poster_url
elif isinstance(item, (Show, Season, Episode)):
check_key = item.ratingKey if isinstance(item, Show) else item.show().ratingKey
if check_key in self.show_rating_key_map:
tmdb_id = self.config.Convert.tvdb_to_tmdb(self.show_rating_key_map[check_key])
if isinstance(item, Show) and item.ratingKey in self.show_rating_key_map:
return self.config.TMDb.get_show(tmdb_id).poster_url
elif isinstance(item, Season):
return self.config.TMDb.get_season(tmdb_id, item.seasonNumber).poster_url
elif isinstance(item, Episode):
return self.config.TMDb.get_episode(tmdb_id, item.seasonNumber, item.episodeNumber).still_url
def item_posters(self, item, providers=None):
if not providers:
providers = ["plex", "tmdb"]
image_url = None
for provider in providers:
if provider == "plex":
for poster in item.posters():
if poster.key.startswith("/"):
image_url = f"{self.url}{poster.key}&X-Plex-Token={self.token}"
if poster.ratingKey.startswith("upload"):
try:
self.check_image_for_overlay(image_url, os.path.join(self.overlay_backup, "temp"), remove=True)
except Failed as e:
logger.trace(f"Plex Error: {e}")
continue
break
if provider == "tmdb":
try:
image_url = self.find_poster_url(item)
except Failed as e:
logger.trace(e)
continue
if image_url:
break
if not image_url:
raise Failed("Overlay Error: No Poster found to reset")
return image_url
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex) @retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def reload(self, item, force=False): def reload(self, item, force=False):
is_full = False is_full = False

Loading…
Cancel
Save