[93] rating overlays

pull/877/head
meisnate12 3 years ago
parent 2c9ce0dfb4
commit 9cfe405e27

@ -1 +1 @@
1.16.5-develop92 1.16.5-develop93

@ -1,7 +1,7 @@
import os, re, time import os, re, time
from datetime import datetime from datetime import datetime
from modules import anidb, anilist, flixpatrol, icheckmovies, imdb, letterboxd, mal, plex, radarr, reciperr, sonarr, tautulli, tmdb, trakt, tvdb, mdblist, util from modules import anidb, anilist, flixpatrol, icheckmovies, imdb, letterboxd, mal, plex, radarr, reciperr, sonarr, tautulli, tmdb, trakt, tvdb, mdblist, util
from modules.util import Failed, ImageData, NotScheduled, NotScheduledRange from modules.util import Failed, NotScheduled, NotScheduledRange, Overlay
from plexapi.audio import Artist, Album, Track from plexapi.audio import Artist, Album, Track
from plexapi.exceptions import BadRequest, NotFound from plexapi.exceptions import BadRequest, NotFound
from plexapi.video import Movie, Show, Season, Episode from plexapi.video import Movie, Show, Season, Episode
@ -232,101 +232,22 @@ class CollectionBuilder:
if not found_type: if not found_type:
raise NotScheduled(f"Skipped because allowed_library_types {self.data[methods['allowed_library_types']]} doesn't match the library type: {self.library.Plex.type}") raise NotScheduled(f"Skipped because allowed_library_types {self.data[methods['allowed_library_types']]} doesn't match the library type: {self.library.Plex.type}")
self.suppress_overlays = []
self.overlay_group = None
self.overlay_weight = None
self.overlay_path = None
self.overlay_coordinates = None
if self.overlay: if self.overlay:
if "overlay" in methods: if "overlay" in methods:
logger.debug("") overlay_data = data[methods["overlay"]]
logger.debug("Validating Method: overlay")
logger.debug(f"Value: {data[methods['overlay']]}")
if isinstance(data[methods["overlay"]], dict):
if "name" not in data[methods["overlay"]] or not data[methods["overlay"]]["name"]:
raise Failed(f"{self.Type} Error: overlay must have the name attribute")
self.overlay = str(data[methods["overlay"]]["name"])
if "group" in data[methods["overlay"]] and data[methods["overlay"]]["group"]:
self.overlay_group = str(data[methods["overlay"]]["group"])
if "weight" in data[methods["overlay"]] and data[methods["overlay"]]["weight"] is not None:
pri = util.check_num(data[methods["overlay"]]["weight"])
if pri is None:
raise Failed(f"{self.Type} Error: overlay weight must be a number")
self.overlay_weight = pri
if ("group" in data[methods["overlay"]] or "weight" in data[methods["overlay"]]) and (not self.overlay_group or self.overlay_weight is None):
raise Failed(f"{self.Type} Error: overlay group and overlay weight must be used together")
x_coordinate = None
if "x_coordinate" in data[methods["overlay"]] and data[methods["overlay"]]["x_coordinate"] is not None:
x_coordinate = util.check_num(data[methods["overlay"]]["x_coordinate"])
if x_coordinate is None or x_coordinate < 0:
raise Failed(f"{self.Type} Error: overlay x_coordinate: {data[methods['overlay']]['x_coordinate']} invalid must be a number 0 or greater")
y_coordinate = None
if "y_coordinate" in data[methods["overlay"]] and data[methods["overlay"]]["y_coordinate"] is not None:
y_coordinate = util.check_num(data[methods["overlay"]]["y_coordinate"])
if y_coordinate is None or y_coordinate < 0:
raise Failed(f"{self.Type} Error: overlay y_coordinate: {data[methods['overlay']]['y_coordinate']} invalid must be a number 0 or greater")
if ("x_coordinate" in data[methods["overlay"]] or "y_coordinate" in data[methods["overlay"]]) and (x_coordinate is None or y_coordinate is None):
raise Failed(f"{self.Type} Error: overlay x_coordinate and overlay y_coordinate must be used together")
if x_coordinate is not None or y_coordinate is not None:
self.overlay_coordinates = (x_coordinate, y_coordinate)
def get_and_save_image(image_url):
response = self.config.get(image_url)
if response.status_code >= 400:
raise Failed(f"{self.Type} Error: Overlay Image not found at: {image_url}")
if "Content-Type" not in response.headers or response.headers["Content-Type"] != "image/png":
raise Failed(f"{self.Type} Error: Overlay Image not a png: {image_url}")
if not os.path.exists(library.overlay_folder) or not os.path.isdir(library.overlay_folder):
os.makedirs(library.overlay_folder, exist_ok=False)
logger.info(f"Creating Overlay Folder found at: {library.overlay_folder}")
clean_image_name, _ = util.validate_filename(self.overlay)
image_path = os.path.join(library.overlay_folder, f"{clean_image_name}.png")
if os.path.exists(image_path):
os.remove(image_path)
with open(image_path, "wb") as handler:
handler.write(response.content)
while util.is_locked(image_path):
time.sleep(1)
return image_path
if "file" in data[methods["overlay"]] and data[methods["overlay"]]["file"]:
self.overlay_path = data[methods["overlay"]]["file"]
elif "git" in data[methods["overlay"]] and data[methods["overlay"]]["git"]:
self.overlay_path = get_and_save_image(f"{util.github_base}{data[methods['overlay']]['git']}.png")
elif "repo" in data[methods["overlay"]] and data[methods["overlay"]]["repo"]:
self.overlay_path = get_and_save_image(f"{self.config.custom_repo}{data[methods['overlay']]['repo']}.png")
elif "url" in data[methods["overlay"]] and data[methods["overlay"]]["url"]:
self.overlay_path = get_and_save_image(data[methods["overlay"]]["url"])
else: else:
self.overlay = str(data[methods["overlay"]]) overlay_data = str(self.mapping_name)
else:
self.overlay = str(self.mapping_name)
logger.warning(f"{self.Type} Warning: No overlay attribute using mapping name {self.mapping_name} as the overlay name") logger.warning(f"{self.Type} Warning: No overlay attribute using mapping name {self.mapping_name} as the overlay name")
if self.overlay.startswith("blur"): suppress = []
try:
match = re.search("\\(([^)]+)\\)", self.overlay)
if not match or 0 >= int(match.group(1)) > 100:
raise ValueError
self.overlay = f"blur({match.group(1)})"
except ValueError:
logger.error(f"Overlay Error: failed to parse overlay blur name: {self.overlay} defaulting to blur(50)")
self.overlay = "blur(50)"
else:
if "|" in self.overlay:
raise Failed(f"{self.Type} Error: Overlay Name: {self.overlay} cannot contain '|'")
if not self.overlay_path:
clean_name, _ = util.validate_filename(self.overlay)
self.overlay_path = os.path.join(library.overlay_folder, f"{clean_name}.png")
if not os.path.exists(self.overlay_path):
raise Failed(f"{self.Type} Error: Overlay Image not found at: {self.overlay_path}")
if "suppress_overlays" in methods: if "suppress_overlays" in methods:
logger.debug("") logger.debug("")
logger.debug("Validating Method: suppress_overlays") logger.debug("Validating Method: suppress_overlays")
logger.debug(f"Value: {data[methods['suppress_overlays']]}") logger.debug(f"Value: {data[methods['suppress_overlays']]}")
if data[methods["suppress_overlays"]]: if data[methods["suppress_overlays"]]:
self.suppress_overlays = util.get_list(data[methods["suppress_overlays"]]) suppress = util.get_list(data[methods["suppress_overlays"]])
else: else:
logger.error(f"{self.Type} Error: suppress_overlays attribute is blank") logger.error(f"Overlay Error: suppress_overlays attribute is blank")
self.overlay = Overlay(config, library, overlay_data, suppress)
self.sync_to_users = None self.sync_to_users = None
self.valid_users = [] self.valid_users = []

@ -243,6 +243,13 @@ class Cache:
date TEXT, date TEXT,
expiration_date TEXT)""" expiration_date TEXT)"""
) )
cursor.execute(
"""CREATE TABLE IF NOT EXISTS overlay_ratings (
key INTEGER PRIMARY KEY,
rating_key INTEGER,
type TEXT,
rating REAL)"""
)
cursor.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='image_map'") cursor.execute("SELECT count(name) FROM sqlite_master WHERE type='table' AND name='image_map'")
if cursor.fetchone()[0] > 0: if cursor.fetchone()[0] > 0:
cursor.execute(f"SELECT DISTINCT library FROM image_map") cursor.execute(f"SELECT DISTINCT library FROM image_map")
@ -855,3 +862,21 @@ class Cache:
cursor.executemany("UPDATE ergast_race SET name = ?, date = ?, expiration_date = ? WHERE season = ? AND round = ?", cursor.executemany("UPDATE ergast_race SET name = ?, date = ?, expiration_date = ? WHERE season = ? AND round = ?",
[(r.name, r.date.strftime("%Y-%m-%d") if r.date else None, [(r.name, r.date.strftime("%Y-%m-%d") if r.date else None,
expiration_date.strftime("%Y-%m-%d"), r.season, r.round) for r in races]) expiration_date.strftime("%Y-%m-%d"), r.season, r.round) for r in races])
def query_overlay_ratings(self, rating_key, rating_type):
rating = None
with sqlite3.connect(self.cache_path) as connection:
connection.row_factory = sqlite3.Row
with closing(connection.cursor()) as cursor:
cursor.execute("SELECT * FROM overlay_ratings WHERE rating_key = ? AND type = ?", (rating_key, rating_type))
row = cursor.fetchone()
if row:
rating = row["rating"]
return rating
def update_overlay_ratings(self, rating_key, rating_type, rating):
with sqlite3.connect(self.cache_path) as connection:
connection.row_factory = sqlite3.Row
with closing(connection.cursor()) as cursor:
cursor.execute("INSERT OR IGNORE INTO overlay_ratings(rating_key, type) VALUES(?, ?)", (rating_key, rating_type))
cursor.execute("UPDATE overlay_ratings SET rating = ? WHERE rating_key = ? AND type = ?", (rating, rating_key, rating_type))

@ -317,7 +317,12 @@ class ConfigFile:
"check_nightly": check_for_attribute(self.data, "check_nightly", parent="settings", var_type="bool", default=False), "check_nightly": check_for_attribute(self.data, "check_nightly", parent="settings", var_type="bool", default=False),
"assets_for_all": check_for_attribute(self.data, "assets_for_all", parent="settings", var_type="bool", default=False, save=False, do_print=False) "assets_for_all": check_for_attribute(self.data, "assets_for_all", parent="settings", var_type="bool", default=False, save=False, do_print=False)
} }
self.custom_repo = self.general["custom_repo"].replace("https://github.com/", "https://raw.githubusercontent.com/") if self.general["custom_repo"] else None self.custom_repo = None
if self.general["custom_repo"]:
repo = self.general["custom_repo"]
if "https://github.com/" in repo:
repo = repo.replace("https://github.com/", "https://raw.githubusercontent.com/").replace("/tree/", "/")
self.custom_repo = repo
self.check_nightly = self.general["check_nightly"] self.check_nightly = self.general["check_nightly"]
self.latest_version = util.current_version(self.version, nightly=self.check_nightly) self.latest_version = util.current_version(self.version, nightly=self.check_nightly)

@ -461,7 +461,7 @@ class MetadataFile(DataFile):
ending = datetime.now().year - (0 if len(year_values) == 1 else int(year_values[1].strip())) ending = datetime.now().year - (0 if len(year_values) == 1 else int(year_values[1].strip()))
else: else:
ending = util.parse("Config", "ending", dynamic_data, parent=f"{map_name} data", methods=number_methods, datatype="int", default=0, minimum=1) ending = util.parse("Config", "ending", dynamic_data, parent=f"{map_name} data", methods=number_methods, datatype="int", default=0, minimum=1)
increment = util.parse("Config", "increment", dynamic_data, parent=f"{map_name} data", methods=number_methods, datatype="int", default=1, minimum=1) increment = util.parse("Config", "increment", dynamic_data, parent=f"{map_name} data", methods=number_methods, datatype="int", default=1, minimum=1) if "increment" in number_methods else 1
if starting > ending: if starting > ending:
raise Failed(f"Config Error: {map_name} data ending must be greater than starting") raise Failed(f"Config Error: {map_name} data ending must be greater than starting")
current = starting current = starting

@ -92,7 +92,7 @@ class Operations:
found_season = False found_season = False
found_episode = False found_episode = False
for season in self.library.query(item.seasons): for season in self.library.query(item.seasons):
season_poster, season_background, _, _ = self.library.find_item_assets(season, item_asset_directory=item_dir, top_item=item) season_poster, season_background, _, _ = self.library.find_item_assets(season, item_asset_directory=item_dir)
if season_poster: if season_poster:
found_season = True found_season = True
elif self.library.show_missing_season_assets and season.seasonNumber > 0: elif self.library.show_missing_season_assets and season.seasonNumber > 0:
@ -101,7 +101,7 @@ class Operations:
self.library.upload_images(season, poster=season_poster, background=season_background) self.library.upload_images(season, poster=season_poster, background=season_background)
for episode in self.library.query(season.episodes): for episode in self.library.query(season.episodes):
if episode.seasonEpisode: if episode.seasonEpisode:
episode_poster, episode_background, _, _ = self.library.find_item_assets(episode, item_asset_directory=item_dir, top_item=item) episode_poster, episode_background, _, _ = self.library.find_item_assets(episode, item_asset_directory=item_dir)
if episode_poster or episode_background: if episode_poster or episode_background:
found_episode = True found_episode = True
self.library.upload_images(episode, poster=episode_poster, background=episode_background) self.library.upload_images(episode, poster=episode_poster, background=episode_background)
@ -113,7 +113,7 @@ class Operations:
missing_assets = "" missing_assets = ""
found_album = False found_album = False
for album in self.library.query(item.albums): for album in self.library.query(item.albums):
album_poster, album_background, _, _ = self.library.find_item_assets(album, item_asset_directory=item_dir, top_item=item) album_poster, album_background, _, _ = self.library.find_item_assets(album, item_asset_directory=item_dir)
if album_poster or album_background: if album_poster or album_background:
found_album = True found_album = True
elif self.library.show_missing_season_assets: elif self.library.show_missing_season_assets:

@ -1,12 +1,12 @@
import os, re, time import os, re, time
from datetime import datetime from datetime import datetime
from modules import util from modules import plex, util
from modules.builder import CollectionBuilder from modules.builder import CollectionBuilder
from modules.util import Failed, NotScheduled from modules.util import Failed, NotScheduled
from plexapi.audio import Album from plexapi.audio import Album
from plexapi.exceptions import BadRequest from plexapi.exceptions import BadRequest
from plexapi.video import Movie, Show, Season, Episode from plexapi.video import Movie, Show, Season, Episode
from PIL import Image, ImageFilter from PIL import Image, ImageDraw, ImageFilter, ImageFont
logger = util.logger logger = util.logger
@ -81,17 +81,42 @@ class Overlays:
overlay_compare = [] if overlay_compare is None else util.get_list(overlay_compare, split="|") overlay_compare = [] if overlay_compare is None else util.get_list(overlay_compare, split="|")
has_overlay = any([item_tag.tag.lower() == "overlay" for item_tag in item.labels]) has_overlay = any([item_tag.tag.lower() == "overlay" for item_tag in item.labels])
compare_names = {f"{on}{properties[on]['coordinates']}" if properties[on]["coordinates"] else on: on for on in over_names} compare_names = {properties[ov].get_overlay_compare(): ov for ov in over_names}
blur_num = 0
text_names = []
normal_overlays = []
for over_name in over_names:
if over_name.startswith("blur"):
blur_test = int(re.search("\\(([^)]+)\\)", over_name).group(1))
if blur_test > blur_num:
blur_num = blur_test
elif over_name.startswith("text"):
text_names.append(over_name)
else:
normal_overlays.append(over_name)
overlay_change = False if has_overlay else True overlay_change = False if has_overlay else True
if not overlay_change: if not overlay_change:
for oc in overlay_compare: for oc in overlay_compare:
if oc not in compare_names: if oc not in compare_names:
overlay_change = True overlay_change = True
if not overlay_change: if not overlay_change:
for over_name in compare_names: for compare_name, original_name in compare_names.items():
if over_name not in overlay_compare or properties[compare_names[over_name]]["updated"]: if compare_name not in overlay_compare or properties[original_name].updated:
overlay_change = True overlay_change = True
if text_names and self.config.Cache:
for over_name in text_names:
rating_type = over_name[5:-1]
if rating_type in ["audience_rating", "critic_rating", "user_rating"]:
cache_rating = self.config.Cache.query_overlay_ratings(item.ratingKey, rating_type)
actual = plex.attribute_translation[rating_type]
if not hasattr(item, actual) or getattr(item, actual) is None:
continue
if getattr(item, actual) != cache_rating:
overlay_change = True
try: try:
poster, _, item_dir, _ = self.library.find_item_assets(item) poster, _, item_dir, _ = self.library.find_item_assets(item)
if not poster and self.library.assets_for_all and self.library.show_missing_assets: if not poster and self.library.assets_for_all and self.library.show_missing_assets:
@ -135,24 +160,35 @@ class Overlays:
logger.error(f"{item_title[:60]:<60} | Overlay Error: No poster found") logger.error(f"{item_title[:60]:<60} | Overlay Error: No poster found")
elif changed_image or overlay_change: elif changed_image or overlay_change:
try: try:
new_poster = Image.open(poster.location if poster else has_original).convert("RGBA") new_poster = Image.open(poster.location if poster else has_original) \
temp = os.path.join(self.library.overlay_folder, f"temp.png") .convert("RGBA") \
blur_num = 0 .resize((1920, 1080) if isinstance(item, Episode) else (1000, 1500), Image.ANTIALIAS)
for over_name in over_names:
if over_name.startswith("blur"):
blur_test = int(re.search("\\(([^)]+)\\)", over_name).group(1))
if blur_test > blur_num:
blur_num = blur_test
if blur_num > 0: if blur_num > 0:
new_poster = new_poster.filter(ImageFilter.GaussianBlur(blur_num)) new_poster = new_poster.filter(ImageFilter.GaussianBlur(blur_num))
for over_name in over_names: for over_name in normal_overlays:
if not over_name.startswith("blur"): overlay = properties[over_name]
if properties[over_name]["coordinates"]: if overlay.coordinates:
new_poster = new_poster.resize((1920, 1080) if isinstance(item, Episode) else (1000, 1500), Image.ANTIALIAS) new_poster.paste(overlay.image, overlay.coordinates, overlay.image)
new_poster.paste(properties[over_name]["image"], properties[over_name]["coordinates"], properties[over_name]["image"])
else: else:
new_poster = new_poster.resize(properties[over_name]["image"].size, Image.ANTIALIAS) new_poster = new_poster.resize(overlay.image.size, Image.ANTIALIAS)
new_poster.paste(properties[over_name]["image"], (0, 0), properties[over_name]["image"]) new_poster.paste(overlay.image, (0, 0), overlay.image)
if text_names:
drawing = ImageDraw.Draw(new_poster)
for over_name in text_names:
overlay = properties[over_name]
font = ImageFont.truetype(overlay.font, overlay.font_size) if overlay.font else None
text = over_name[5:-1]
if text in ["audience_rating", "critic_rating", "user_rating"]:
rating_type = text
actual = plex.attribute_translation[rating_type]
if not hasattr(item, actual) or getattr(item, actual) is None:
logger.error(f"Overlay Error: No {rating_type} found")
continue
text = getattr(item, actual)
if self.config.Cache:
self.config.Cache.update_overlay_ratings(item.ratingKey, rating_type, text)
drawing.text(overlay.coordinates, str(text), font=font, fill=overlay.font_color)
temp = os.path.join(self.library.overlay_folder, f"temp.png")
new_poster.save(temp, "PNG") new_poster.save(temp, "PNG")
self.library.upload_poster(item, temp) self.library.upload_poster(item, temp)
self.library.edit_tags("label", item, add_tags=["Overlay"], do_print=False) self.library.edit_tags("label", item, add_tags=["Overlay"], do_print=False)
@ -200,12 +236,8 @@ class Overlays:
logger.separator(f"Gathering Items for {k} Overlay", space=False, border=False) logger.separator(f"Gathering Items for {k} Overlay", space=False, border=False)
if builder.overlay not in properties: if builder.overlay.name not in properties:
properties[builder.overlay] = { properties[builder.overlay.name] = builder.overlay
"keys": [], "suppress": builder.suppress_overlays, "group": builder.overlay_group,
"weight": builder.overlay_weight, "path": builder.overlay_path, "updated": False,
"image": None, "coordinates": builder.overlay_coordinates,
}
for method, value in builder.builders: for method, value in builder.builders:
logger.debug("") logger.debug("")
@ -225,41 +257,42 @@ class Overlays:
for item in builder.added_items: for item in builder.added_items:
key_to_item[item.ratingKey] = item key_to_item[item.ratingKey] = item
added_titles.append(item) added_titles.append(item)
if item.ratingKey not in properties[builder.overlay]["keys"]: if item.ratingKey not in properties[builder.overlay.name].keys:
properties[builder.overlay]["keys"].append(item.ratingKey) properties[builder.overlay.name].keys.append(item.ratingKey)
if added_titles: if added_titles:
logger.debug(f"{len(added_titles)} Titles Found: {[self.get_item_sort_title(a, atr='title') for a in added_titles]}") logger.debug(f"{len(added_titles)} Titles Found: {[self.get_item_sort_title(a, atr='title') for a in added_titles]}")
logger.info(f"{len(added_titles) if added_titles else 'No'} Items found for {builder.overlay}") logger.info(f"{len(added_titles) if added_titles else 'No'} Items found for {builder.overlay.name}")
except NotScheduled as e: except NotScheduled as e:
logger.info(e) logger.info(e)
except Failed as e: except Failed as e:
logger.stacktrace()
logger.error(e) logger.error(e)
for overlay_name, over_attrs in properties.items(): for overlay_name, over_obj in properties.items():
if over_attrs["group"]: if over_obj.group:
if over_attrs["group"] not in overlay_groups: if over_obj.group not in overlay_groups:
overlay_groups[over_attrs["group"]] = {} overlay_groups[over_obj.group] = {}
overlay_groups[over_attrs["group"]][overlay_name] = over_attrs["weight"] overlay_groups[over_obj.group][overlay_name] = over_obj.weight
for rk in over_attrs["keys"]: for rk in over_obj.keys:
for suppress_name in over_attrs["suppress"]: for suppress_name in over_obj.suppress:
if suppress_name in properties and rk in properties[suppress_name]["keys"]: if suppress_name in properties and rk in properties[suppress_name].keys:
properties[suppress_name]["keys"].remove(rk) properties[suppress_name].keys.remove(rk)
if not overlay_name.startswith("blur"): if not overlay_name.startswith(("blur", "text")):
image_compare = None image_compare = None
if self.config.Cache: if self.config.Cache:
_, image_compare, _ = self.config.Cache.query_image_map(overlay_name, f"{self.library.image_table_name}_overlays") _, image_compare, _ = self.config.Cache.query_image_map(overlay_name, f"{self.library.image_table_name}_overlays")
overlay_size = os.stat(properties[overlay_name]["path"]).st_size overlay_size = os.stat(over_obj.path).st_size
properties[overlay_name]["updated"] = not image_compare or str(overlay_size) != str(image_compare) over_obj.updated = not image_compare or str(overlay_size) != str(image_compare)
try: try:
properties[overlay_name]["image"] = Image.open(properties[overlay_name]["path"]).convert("RGBA") over_obj.image = Image.open(over_obj.path).convert("RGBA")
if self.config.Cache: if self.config.Cache:
self.config.Cache.update_image_map(overlay_name, f"{self.library.image_table_name}_overlays", overlay_name, overlay_size) self.config.Cache.update_image_map(overlay_name, f"{self.library.image_table_name}_overlays", overlay_name, overlay_size)
except OSError: except OSError:
logger.error(f"Overlay Error: overlay image {properties[overlay_name]['path']} failed to load") logger.error(f"Overlay Error: overlay image {over_obj.path} failed to load")
properties.pop(overlay_name) properties.pop(overlay_name)
for overlay_name, over_attrs in properties.items(): for overlay_name, over_obj in properties.items():
for over_key in over_attrs["keys"]: for over_key in over_obj.keys:
if over_key not in key_to_overlays: if over_key not in key_to_overlays:
key_to_overlays[over_key] = (key_to_item[over_key], []) key_to_overlays[over_key] = (key_to_item[over_key], [])
key_to_overlays[over_key][1].append(overlay_name) key_to_overlays[over_key][1].append(overlay_name)

@ -891,7 +891,7 @@ class Plex(Library):
title = item.title title = item.title
posters, backgrounds = util.get_image_dicts(group, alias) posters, backgrounds = util.get_image_dicts(group, alias)
try: try:
asset_poster, asset_background, item_dir, _ = self.find_item_assets(item, item_asset_directory=asset_location, top_item=top_item) asset_poster, asset_background, item_dir, _ = self.find_item_assets(item, item_asset_directory=asset_location)
if asset_poster: if asset_poster:
posters["asset_directory"] = asset_poster posters["asset_directory"] = asset_poster
if asset_background: if asset_background:
@ -911,7 +911,7 @@ class Plex(Library):
self.upload_images(item, poster=poster, background=background) self.upload_images(item, poster=poster, background=background)
return asset_location return asset_location
def find_item_assets(self, item, item_asset_directory=None, asset_directory=None, top_item=None): def find_item_assets(self, item, item_asset_directory=None, asset_directory=None):
poster = None poster = None
background = None background = None
folder_name = None folder_name = None
@ -921,13 +921,13 @@ class Plex(Library):
is_top_level = isinstance(item, (Movie, Artist, Show, Collection, Playlist, str)) is_top_level = isinstance(item, (Movie, Artist, Show, Collection, Playlist, str))
if isinstance(item, Album): if isinstance(item, Album):
prefix = f"{top_item.title} Album {item.title}'s " prefix = f"{item.parentTitle} Album {item.title}'s "
file_name = item.title file_name = item.title
elif isinstance(item, Season): elif isinstance(item, Season):
prefix = f"{top_item.title} Season {item.seasonNumber}'s " prefix = f"{item.parentTitle} Season {item.seasonNumber}'s "
file_name = f"Season{'0' if item.seasonNumber < 10 else ''}{item.seasonNumber}" file_name = f"Season{'0' if item.seasonNumber < 10 else ''}{item.seasonNumber}"
elif isinstance(item, Episode): elif isinstance(item, Episode):
prefix = f"{top_item.title} {item.seasonEpisode.upper()}'s " prefix = f"{item.grandparentTitle} {item.seasonEpisode.upper()}'s "
file_name = item.seasonEpisode.upper() file_name = item.seasonEpisode.upper()
else: else:
prefix = f"{item if isinstance(item, str) else item.title}'s " prefix = f"{item if isinstance(item, str) else item.title}'s "
@ -1140,7 +1140,6 @@ class Plex(Library):
return True return True
def check_filter(self, item, filter_attr, modifier, filter_final, filter_data, current_time): def check_filter(self, item, filter_attr, modifier, filter_final, filter_data, current_time):
filter_actual = attribute_translation[filter_attr] if filter_attr in attribute_translation else filter_attr filter_actual = attribute_translation[filter_attr] if filter_attr in attribute_translation else filter_attr
if isinstance(item, Movie): if isinstance(item, Movie):
item_type = "movie" item_type = "movie"
@ -1169,8 +1168,7 @@ class Plex(Library):
if filter_attr == "audio_track_title": if filter_attr == "audio_track_title":
for media in item.media: for media in item.media:
for part in media.parts: for part in media.parts:
values.extend( values.extend([a.extendedDisplayTitle for a in part.audioStreams() if a.extendedDisplayTitle])
[a.extendedDisplayTitle for a in part.audioStreams() if a.extendedDisplayTitle])
elif filter_attr == "filepath": elif filter_attr == "filepath":
values = [loc for loc in item.locations] values = [loc for loc in item.locations]
else: else:

@ -4,6 +4,7 @@ from pathvalidate import is_valid_filename, sanitize_filename
from plexapi.audio import Album, Track from plexapi.audio import Album, Track
from plexapi.exceptions import BadRequest, NotFound, Unauthorized from plexapi.exceptions import BadRequest, NotFound, Unauthorized
from plexapi.video import Season, Episode, Movie from plexapi.video import Season, Episode, Movie
from PIL import ImageColor
try: try:
import msvcrt import msvcrt
@ -776,6 +777,23 @@ def check_time(message, end=False):
logger.debug(f"{message}: {current_time - previous_time}") logger.debug(f"{message}: {current_time - previous_time}")
previous_time = None if end else current_time previous_time = None if end else current_time
def get_system_fonts():
dirs = []
if sys.platform == "win32":
windir = os.environ.get("WINDIR")
if windir:
dirs.append(os.path.join(windir, "fonts"))
elif sys.platform in ("linux", "linux2"):
lindirs = os.environ.get("XDG_DATA_DIRS", "")
if not lindirs:
lindirs = "/usr/share"
dirs += [os.path.join(lindir, "fonts") for lindir in lindirs.split(":")]
elif sys.platform == "darwin":
dirs += ["/Library/Fonts", "/System/Library/Fonts", os.path.expanduser("~/Library/Fonts")]
else:
return dirs
return [n for d in dirs for _, _, ns in os.walk(d) for n in ns]
class YAML: class YAML:
def __init__(self, path=None, input_data=None, check_empty=False, create=False): def __init__(self, path=None, input_data=None, check_empty=False, create=False):
self.path = path self.path = path
@ -807,3 +825,141 @@ class YAML:
if self.path: if self.path:
with open(self.path, 'w') as fp: with open(self.path, 'w') as fp:
self.yaml.dump(self.data, fp) self.yaml.dump(self.data, fp)
class Overlay:
def __init__(self, config, library, overlay_data, suppress):
self.config = config
self.library = library
self.data = overlay_data
self.suppress = suppress
self.keys = []
self.updated = False
self.image = None
self.group = None
self.weight = None
self.path = None
self.coordinates = None
self.font = None
self.font_size = 12
self.font_color = None
logger.debug("")
logger.debug("Validating Method: overlay")
logger.debug(f"Value: {self.data}")
if isinstance(self.data, dict):
if "name" not in self.data or not self.data["name"]:
raise Failed(f"Overlay Error: overlay must have the name attribute")
self.name = str(self.data["name"])
if "group" in self.data and self.data["group"]:
self.group = str(self.data["group"])
if "weight" in self.data and self.data["weight"] is not None:
pri = check_num(self.data["weight"])
if pri is None:
raise Failed(f"Overlay Error: overlay weight must be a number")
self.weight = pri
if ("group" in self.data or "weight" in self.data) and (self.weight is None or not self.group):
raise Failed(f"Overlay Error: overlay attribute's group and weight must be used together")
x_cord = None
y_cord = None
if "x_coordinate" in self.data and self.data["x_coordinate"] is not None:
x_cord = check_num(self.data["x_coordinate"])
if x_cord is None or x_cord < 0:
raise Failed(f"Overlay Error: overlay x_coordinate: {self.data['x_coordinate']} must be a number 0 or greater")
if "y_coordinate" in self.data and self.data["y_coordinate"] is not None:
y_cord = check_num(self.data["y_coordinate"])
if y_cord is None or y_cord < 0:
raise Failed(f"Overlay Error: overlay y_coordinate: {self.data['y_coordinate']} must be a number 0 or greater")
if ("x_coordinate" in self.data or "y_coordinate" in self.data) and (x_cord is None or y_cord is None):
raise Failed(f"Overlay Error: overlay x_coordinate and overlay y_coordinate must be used together")
if x_cord is not None or y_cord is not None:
self.coordinates = (x_cord, y_cord)
def get_and_save_image(image_url):
response = self.config.get(image_url)
if response.status_code >= 400:
raise Failed(f"Overlay Error: Overlay Image not found at: {image_url}")
if "Content-Type" not in response.headers or response.headers["Content-Type"] != "image/png":
raise Failed(f"Overlay Error: Overlay Image not a png: {image_url}")
if not os.path.exists(library.overlay_folder) or not os.path.isdir(library.overlay_folder):
os.makedirs(library.overlay_folder, exist_ok=False)
logger.info(f"Creating Overlay Folder found at: {library.overlay_folder}")
clean_image_name, _ = validate_filename(self.name)
image_path = os.path.join(library.overlay_folder, f"{clean_image_name}.png")
if os.path.exists(image_path):
os.remove(image_path)
with open(image_path, "wb") as handler:
handler.write(response.content)
while is_locked(image_path):
time.sleep(1)
return image_path
if not self.name.startswith(("blur", "text")):
if "file" in self.data and self.data["file"]:
self.path = self.data["file"]
elif "git" in self.data and self.data["git"]:
self.path = get_and_save_image(f"{github_base}{self.data['git']}.png")
elif "repo" in self.data and self.data["repo"]:
self.path = get_and_save_image(f"{self.config.custom_repo}{self.data['repo']}.png")
elif "url" in self.data and self.data["url"]:
self.path = get_and_save_image(self.data["url"])
if self.name.startswith("blur"):
try:
match = re.search("\\(([^)]+)\\)", self.name)
if not match or 0 >= int(match.group(1)) > 100:
raise ValueError
self.name = f"blur({match.group(1)})"
except ValueError:
logger.error(f"Overlay Error: failed to parse overlay blur name: {self.name} defaulting to blur(50)")
self.name = "blur(50)"
elif self.name.startswith("text"):
if not self.coordinates:
raise Failed(f"Overlay Error: overlay attribute's x_coordinate and y_coordinate are required when using text")
match = re.search("\\(([^)]+)\\)", self.name)
if not match:
raise Failed(f"Overlay Error: failed to parse overlay text name: {self.name}")
self.name = f"text({match.group(1)})"
if "font" in self.data and self.data["font"]:
font = str(self.data["font"])
if not os.path.exists(font):
fonts = get_system_fonts()
if font not in fonts:
raise Failed(f"Overlay Error: font: {font} not found. Options: {', '.join(fonts)}")
self.font = font
if "font_size" in self.data and self.data["font_size"] is not None:
font_size = check_num(self.data["font_size"])
if font_size is None or font_size < 1:
logger.error(f"Overlay Error: overlay font_size: {self.data['font_size']} invalid must be a greater than 0")
else:
self.font_size = font_size
if "font_color" in self.data and self.data["font_color"]:
try:
color_str = self.data["font_color"]
color_str = color_str if color_str.startswith("#") else f"#{color_str}"
self.font_color = ImageColor.getcolor(color_str, "RGB")
except ValueError:
logger.error(f"Overlay Error: overlay color: {self.data['color']} invalid")
else:
if "|" in self.name:
raise Failed(f"Overlay Error: Overlay Name: {self.name} cannot contain '|'")
if not self.path:
clean_name, _ = validate_filename(self.name)
self.path = os.path.join(library.overlay_folder, f"{clean_name}.png")
if not os.path.exists(self.path):
raise Failed(f"Overlay Error: Overlay Image not found at: {self.path}")
else:
self.name = str(self.data)
logger.warning(f"Overlay Warning: No overlay attribute using mapping name {self.data} as the overlay name")
def get_overlay_compare(self):
output = self.name
if self.group:
output += f"{self.group}{self.weight}"
if self.coordinates:
output += str(self.coordinates)
if self.font:
output += f"{self.font}{self.font_size}"
if self.font_color:
output += str(self.font_color)
return output

Loading…
Cancel
Save