You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/library.py

390 lines
19 KiB

import os, time
3 years ago
from abc import ABC, abstractmethod
from modules import util, operations
from modules.meta import MetadataFile, OverlayFile
from modules.operations import Operations
from modules.util import Failed, NotScheduled, YAML
from PIL import Image
3 years ago
logger = util.logger
3 years ago
class Library(ABC):
def __init__(self, config, params):
self.Radarr = None
self.Sonarr = None
self.Tautulli = None
3 years ago
self.Webhooks = None
self.Operations = Operations(config, self)
self.Overlays = None
3 years ago
self.collections = []
self.collection_names = []
3 years ago
self.metadatas = []
self.queues = {}
self.image_styles = {}
self.collection_images = {}
self.queue_current = 0
self.collection_files = []
3 years ago
self.metadata_files = []
self.overlay_files = []
self.images_files = []
3 years ago
self.movie_map = {}
self.show_map = {}
self.imdb_map = {}
self.anidb_map = {}
self.mal_map = {}
self.movie_rating_key_map = {}
self.show_rating_key_map = {}
self.imdb_rating_key_map = {}
self.cached_items = {}
3 years ago
self.run_again = []
self.type = ""
self.config = config
self.name = params["name"]
self.original_mapping_name = params["mapping_name"]
self.scanned_collection_files = params["collection_files"]
self.scanned_metadata_files = params["metadata_files"]
self.scanned_overlay_files = params["overlay_files"]
self.scanned_image_files = params["image_files"]
self.skip_library = params["skip_library"]
self.asset_depth = params["asset_depth"]
3 years ago
self.asset_directory = params["asset_directory"] if params["asset_directory"] else []
3 years ago
self.default_dir = params["default_dir"]
self.mapping_name, output = util.validate_filename(self.original_mapping_name)
self.image_table_name = self.config.Cache.get_image_table_name(self.original_mapping_name) if self.config.Cache else None
self.overlay_folder = os.path.join(self.config.default_dir, "overlays")
self.overlay_backup = os.path.join(self.overlay_folder, f"{self.mapping_name} Original Posters")
self.report_path = params["report_path"] if params["report_path"] else os.path.join(self.default_dir, f"{self.mapping_name}_report.yml")
self.report_data = {}
self.run_order = params["run_order"]
3 years ago
self.asset_folders = params["asset_folders"]
self.create_asset_folders = params["create_asset_folders"]
self.dimensional_asset_rename = params["dimensional_asset_rename"]
self.prioritize_assets = params["prioritize_assets"]
self.download_url_assets = params["download_url_assets"]
self.show_missing_season_assets = params["show_missing_season_assets"]
self.show_missing_episode_assets = params["show_missing_episode_assets"]
self.show_asset_not_needed = params["show_asset_not_needed"]
3 years ago
self.sync_mode = params["sync_mode"]
self.default_collection_order = params["default_collection_order"]
self.minimum_items = params["minimum_items"]
self.item_refresh_delay = params["item_refresh_delay"]
self.delete_below_minimum = params["delete_below_minimum"]
self.delete_not_scheduled = params["delete_not_scheduled"]
self.missing_only_released = params["missing_only_released"]
3 years ago
self.show_unmanaged = params["show_unmanaged"]
self.show_unconfigured = params["show_unconfigured"]
3 years ago
self.show_filtered = params["show_filtered"]
3 years ago
self.show_options = params["show_options"]
3 years ago
self.show_missing = params["show_missing"]
self.show_missing_assets = params["show_missing_assets"]
self.save_report = params["save_report"]
self.only_filter_missing = params["only_filter_missing"]
self.ignore_ids = params["ignore_ids"]
self.ignore_imdb_ids = params["ignore_imdb_ids"]
self.overlay_quality = params["overlay_quality"]
self.overlay_filetype = params["overlay_filetype"]
self.assets_for_all = params["assets_for_all"]
self.assets_for_all_collections = False
self.delete_collections = params["delete_collections"]
self.mass_studio_update = params["mass_studio_update"]
3 years ago
self.mass_genre_update = params["mass_genre_update"]
self.mass_audience_rating_update = params["mass_audience_rating_update"]
self.mass_critic_rating_update = params["mass_critic_rating_update"]
self.mass_user_rating_update = params["mass_user_rating_update"]
self.mass_episode_audience_rating_update = params["mass_episode_audience_rating_update"]
self.mass_episode_critic_rating_update = params["mass_episode_critic_rating_update"]
self.mass_episode_user_rating_update = params["mass_episode_user_rating_update"]
self.mass_content_rating_update = params["mass_content_rating_update"]
self.mass_original_title_update = params["mass_original_title_update"]
self.mass_originally_available_update = params["mass_originally_available_update"]
self.mass_imdb_parental_labels = params["mass_imdb_parental_labels"]
self.mass_poster_update = params["mass_poster_update"]
self.mass_background_update = params["mass_background_update"]
3 years ago
self.radarr_add_all_existing = params["radarr_add_all_existing"]
self.radarr_remove_by_tag = params["radarr_remove_by_tag"]
3 years ago
self.sonarr_add_all_existing = params["sonarr_add_all_existing"]
self.sonarr_remove_by_tag = params["sonarr_remove_by_tag"]
self.update_blank_track_titles = params["update_blank_track_titles"]
self.remove_title_parentheses = params["remove_title_parentheses"]
self.remove_overlays = params["remove_overlays"]
3 years ago
self.reapply_overlays = params["reapply_overlays"]
self.reset_overlays = params["reset_overlays"]
self.mass_collection_mode = params["mass_collection_mode"]
self.metadata_backup = params["metadata_backup"]
self.genre_mapper = params["genre_mapper"]
self.content_rating_mapper = params["content_rating_mapper"]
3 years ago
self.changes_webhooks = params["changes_webhooks"]
3 years ago
self.split_duplicates = params["split_duplicates"] # TODO: Here or just in Plex?
self.clean_bundles = params["plex"]["clean_bundles"] # TODO: Here or just in Plex?
self.empty_trash = params["plex"]["empty_trash"] # TODO: Here or just in Plex?
self.optimize = params["plex"]["optimize"] # TODO: Here or just in Plex?
self.stats = {"created": 0, "modified": 0, "deleted": 0, "added": 0, "unchanged": 0, "removed": 0, "radarr": 0, "sonarr": 0, "names": []}
self.status = {}
3 years ago
self.items_library_operation = True if self.assets_for_all or self.mass_genre_update or self.remove_title_parentheses \
or self.mass_audience_rating_update or self.mass_critic_rating_update or self.mass_user_rating_update \
or self.mass_episode_audience_rating_update or self.mass_episode_critic_rating_update or self.mass_episode_user_rating_update \
or self.mass_content_rating_update or self.mass_originally_available_update or self.mass_original_title_update\
or self.mass_imdb_parental_labels or self.genre_mapper or self.content_rating_mapper or self.mass_studio_update\
or self.radarr_add_all_existing or self.sonarr_add_all_existing or self.mass_poster_update or self.mass_background_update else False
self.library_operation = True if self.items_library_operation or self.delete_collections or self.mass_collection_mode \
or self.radarr_remove_by_tag or self.sonarr_remove_by_tag or self.show_unmanaged or self.show_unconfigured \
or self.metadata_backup or self.update_blank_track_titles else False
self.label_operations = True if self.assets_for_all or self.mass_imdb_parental_labels else False
if self.asset_directory:
logger.info("")
for ad in self.asset_directory:
logger.info(f"Using Asset Directory: {ad}")
if output:
logger.info("")
logger.info(output)
def scan_files(self, operations_only, overlays_only, collection_only, metadata_only):
if not operations_only and not overlays_only and not metadata_only:
for file_type, metadata_file, temp_vars, asset_directory in self.scanned_collection_files:
try:
meta_obj = MetadataFile(self.config, self, file_type, metadata_file, temp_vars, asset_directory, "collection")
if meta_obj.collections:
self.collections.extend([c for c in meta_obj.collections])
self.collection_files.append(meta_obj)
except Failed as e:
logger.error(e)
logger.info("Collection File Failed To Load")
except NotScheduled as e:
logger.info("")
logger.separator(f"Skipping {e} Collection File")
if not operations_only and not overlays_only and not collection_only:
for file_type, metadata_file, temp_vars, asset_directory in self.scanned_metadata_files:
try:
meta_obj = MetadataFile(self.config, self, file_type, metadata_file, temp_vars, asset_directory, "metadata")
if meta_obj.metadata:
self.metadatas.extend([m for m in meta_obj.metadata])
self.metadata_files.append(meta_obj)
except Failed as e:
logger.error(e)
logger.info("Metadata File Failed To Load")
except NotScheduled as e:
logger.info("")
logger.separator(f"Skipping {e} Metadata File")
if not operations_only and not collection_only and not metadata_only:
for file_type, overlay_file, temp_vars, asset_directory in self.scanned_overlay_files:
try:
overlay_obj = OverlayFile(self.config, self, file_type, overlay_file, temp_vars, asset_directory, self.queue_current)
self.overlay_files.append(overlay_obj)
for qk, qv in overlay_obj.queues.items():
self.queues[self.queue_current] = qv
self.queue_current += 1
except Failed as e:
logger.error(e)
logger.info("Overlay File Failed To Load")
except NotScheduled as e:
logger.info("")
logger.separator(f"Skipping {e} Overlay File")
if not operations_only and not overlays_only and not collection_only:
for file_type, images_file, temp_vars, asset_directory in self.scanned_image_files:
try:
images_obj = MetadataFile(self.config, self, file_type, images_file, temp_vars, asset_directory, "image")
self.images_files.append(images_obj)
except Failed as e:
logger.error(e)
logger.info("Image File Failed To Load")
except NotScheduled as e:
logger.info("")
logger.separator(f"Skipping {e} Image File")
3 years ago
def upload_images(self, item, poster=None, background=None, overlay=False):
3 years ago
poster_uploaded = False
if poster is not None:
try:
image_compare = None
if self.config.Cache:
_, image_compare, _ = self.config.Cache.query_image_map(item.ratingKey, self.image_table_name)
if not image_compare or str(poster.compare) != str(image_compare):
if overlay:
self.reload(item, force=True)
if overlay and "Overlay" in [la.tag for la in self.item_labels(item)]:
item.removeLabel("Overlay")
3 years ago
self._upload_image(item, poster)
poster_uploaded = True
logger.info(f"Metadata: {poster.attribute} updated {poster.message}")
elif self.show_asset_not_needed:
logger.info(f"Metadata: {poster.prefix}poster update not needed")
3 years ago
except Failed:
logger.stacktrace()
logger.error(f"Metadata: {poster.attribute} failed to update {poster.message}")
3 years ago
background_uploaded = False
if background is not None:
try:
image_compare = None
3 years ago
if self.config.Cache:
_, image_compare, _ = self.config.Cache.query_image_map(item.ratingKey, f"{self.image_table_name}_backgrounds")
if not image_compare or str(background.compare) != str(image_compare):
3 years ago
self._upload_image(item, background)
background_uploaded = True
logger.info(f"Metadata: {background.attribute} updated {background.message}")
elif self.show_asset_not_needed:
logger.info(f"Metadata: {background.prefix}background update not needed")
3 years ago
except Failed:
logger.stacktrace()
logger.error(f"Metadata: {background.attribute} failed to update {background.message}")
3 years ago
if self.config.Cache:
if poster_uploaded:
self.config.Cache.update_image_map(item.ratingKey, self.image_table_name, "", poster.compare if poster else "")
3 years ago
if background_uploaded:
self.config.Cache.update_image_map(item.ratingKey, f"{self.image_table_name}_backgrounds", "", background.compare)
3 years ago
return poster_uploaded, background_uploaded
def get_id_from_maps(self, key):
key = str(key)
if key in self.movie_rating_key_map:
return self.movie_rating_key_map[key]
elif key in self.show_rating_key_map:
return self.show_rating_key_map[key]
@abstractmethod
def notify(self, text, collection=None, critical=True):
pass
@abstractmethod
def notify_delete(self, message):
pass
3 years ago
@abstractmethod
def _upload_image(self, item, image):
pass
@abstractmethod
def upload_poster(self, item, image, url=False):
3 years ago
pass
def poster_update(self, item, image, tmdb=None, title=None):
return self.image_update(item, image, tmdb=tmdb, title=title)
def background_update(self, item, image, tmdb=None, title=None):
return self.image_update(item, image, tmdb=tmdb, title=title, poster=False)
@abstractmethod
def image_update(self, item, image, tmdb=None, title=None, poster=True):
pass
3 years ago
@abstractmethod
def reload(self, item, force=False):
3 years ago
pass
@abstractmethod
def edit_tags(self, attr, obj, add_tags=None, remove_tags=None, sync_tags=None, do_print=True, locked=True, is_locked=None):
3 years ago
pass
@abstractmethod
def item_labels(self, item):
pass
@abstractmethod
def find_poster_url(self, item):
pass
def check_image_for_overlay(self, image_url, image_path, remove=False):
image_path = util.download_image("", image_url, image_path).location
while util.is_locked(image_path):
time.sleep(1)
with Image.open(image_path) as image:
exif_tags = image.getexif()
if 0x04bc in exif_tags and exif_tags[0x04bc] == "overlay":
os.remove(image_path)
9 months ago
raise Failed("This item's poster already has an Overlay. There is no Kometa setting to change; manual attention required.")
if remove:
os.remove(image_path)
else:
return image_path
@abstractmethod
2 years ago
def item_posters(self, item, providers=None):
pass
3 years ago
@abstractmethod
def get_all(self, builder_level=None, load=False):
3 years ago
pass
def add_additions(self, collection, items, is_movie):
self._add_to_file("Added", collection, items, is_movie)
3 years ago
def add_missing(self, collection, items, is_movie):
self._add_to_file("Missing", collection, items, is_movie)
def add_removed(self, collection, items, is_movie):
self._add_to_file("Removed", collection, items, is_movie)
def add_filtered(self, collection, items, is_movie):
self._add_to_file("Filtered", collection, items, is_movie)
def _add_to_file(self, file_type, collection, items, is_movie):
if collection not in self.report_data:
self.report_data[collection] = {}
parts = isinstance(items[0], str)
if parts:
other = f"Parts {file_type}"
section = other
elif is_movie:
other = f"Movies {file_type}"
section = f"{other} (TMDb IDs)"
else:
other = f"Shows {file_type}"
section = f"{other} (TVDb IDs)"
if section not in self.report_data[collection]:
self.report_data[collection][section] = [] if parts else {}
if parts:
self.report_data[collection][section].extend(items)
else:
for title, item_id in items:
if item_id:
self.report_data[collection][section][int(item_id)] = title
else:
if other not in self.report_data[collection]:
self.report_data[collection][other] = []
self.report_data[collection][other].append(title)
yaml = YAML(self.report_path, start_empty=True)
yaml.data = self.report_data
yaml.save()
3 years ago
def cache_items(self):
logger.info("")
logger.separator(f"Caching {self.name} Library Items", space=False, border=False)
logger.info("")
3 years ago
items = self.get_all()
for item in items:
self.cached_items[item.ratingKey] = (item, False)
return items
def map_guids(self, items):
3 years ago
for i, item in enumerate(items, 1):
if isinstance(item, tuple):
logger.ghost(f"Processing: {i}/{len(items)}")
key, guid = item
else:
logger.ghost(f"Processing: {i}/{len(items)} {item.title}")
key = item.ratingKey
guid = item.guid
if key not in self.movie_rating_key_map and key not in self.show_rating_key_map:
if isinstance(item, tuple):
item_type, check_id = self.config.Convert.scan_guid(guid)
id_type, main_id, imdb_id, _ = self.config.Convert.ids_from_cache(key, guid, item_type, check_id, self)
else:
id_type, main_id, imdb_id = self.config.Convert.get_id(item, self)
3 years ago
if main_id:
if id_type == "movie":
self.movie_rating_key_map[key] = main_id[0]
util.add_dict_list(main_id, key, self.movie_map)
3 years ago
elif id_type == "show":
self.show_rating_key_map[key] = main_id[0]
util.add_dict_list(main_id, key, self.show_map)
3 years ago
if imdb_id:
self.imdb_rating_key_map[key] = imdb_id[0]
util.add_dict_list(imdb_id, key, self.imdb_map)
3 years ago
logger.info("")
logger.info(f"Processed {len(items)} {self.type}s")