import logging , os , plexapi , requests
from modules import builder , util
from modules . library import Library
from modules . util import Failed , ImageData
from PIL import Image
from plexapi import utils
from plexapi . exceptions import BadRequest , NotFound , Unauthorized
from plexapi . collection import Collection
from plexapi . playlist import Playlist
from plexapi . server import PlexServer
from plexapi . video import Movie , Show
from retrying import retry
from urllib import parse
from xml . etree . ElementTree import ParseError
logger = logging . getLogger ( " Plex Meta Manager " )
builders = [ " plex_all " , " plex_collectionless " , " plex_search " ]
search_translation = {
" episode_title " : " episode.title " ,
" network " : " show.network " ,
" critic_rating " : " rating " ,
" audience_rating " : " audienceRating " ,
" user_rating " : " userRating " ,
" episode_user_rating " : " episode.userRating " ,
" content_rating " : " contentRating " ,
" episode_year " : " episode.year " ,
" release " : " originallyAvailableAt " ,
" episode_unmatched " : " episode.unmatched " ,
" episode_duplicate " : " episode.duplicate " ,
" added " : " addedAt " ,
" episode_added " : " episode.addedAt " ,
" episode_air_date " : " episode.originallyAvailableAt " ,
" plays " : " viewCount " ,
" episode_plays " : " episode.viewCount " ,
" last_played " : " lastViewedAt " ,
" episode_last_played " : " episode.lastViewedAt " ,
" unplayed " : " unwatched " ,
" episode_unplayed " : " episode.unwatched " ,
" subtitle_language " : " subtitleLanguage " ,
" audio_language " : " audioLanguage " ,
" progress " : " inProgress " ,
" episode_progress " : " episode.inProgress " ,
" unplayed_episodes " : " show.unwatchedLeaves "
}
show_translation = {
" title " : " show.title " ,
" studio " : " show.studio " ,
" rating " : " show.rating " ,
" audienceRating " : " show.audienceRating " ,
" userRating " : " show.userRating " ,
" contentRating " : " show.contentRating " ,
" year " : " show.year " ,
" originallyAvailableAt " : " show.originallyAvailableAt " ,
" unmatched " : " show.unmatched " ,
" genre " : " show.genre " ,
" collection " : " show.collection " ,
" actor " : " show.actor " ,
" addedAt " : " show.addedAt " ,
" viewCount " : " show.viewCount " ,
" lastViewedAt " : " show.lastViewedAt " ,
" resolution " : " episode.resolution " ,
" hdr " : " episode.hdr " ,
" subtitleLanguage " : " episode.subtitleLanguage " ,
" audioLanguage " : " episode.audioLanguage " ,
" trash " : " episode.trash " ,
" label " : " show.label " ,
}
modifier_translation = {
" " : " " , " .not " : " ! " , " .is " : " % 3D " , " .isnot " : " ! % 3D " , " .gt " : " %3E %3E " , " .gte " : " %3E " , " .lt " : " % 3C % 3C " , " .lte " : " % 3C " ,
" .before " : " % 3C % 3C " , " .after " : " %3E %3E " , " .begins " : " % 3C " , " .ends " : " %3E "
}
episode_sorting_options = { " default " : " -1 " , " oldest " : " 0 " , " newest " : " 1 " }
keep_episodes_options = { " all " : 0 , " 5_latest " : 5 , " 3_latest " : 3 , " latest " : 1 , " past_3 " : - 3 , " past_7 " : - 7 , " past_30 " : - 30 }
delete_episodes_options = { " never " : 0 , " day " : 1 , " week " : 7 , " refresh " : 100 }
season_display_options = { " default " : - 1 , " show " : 0 , " hide " : 1 }
episode_ordering_options = { " default " : None , " tmdb_aired " : " tmdbAiring " , " tvdb_aired " : " aired " , " tvdb_dvd " : " dvd " , " tvdb_absolute " : " absolute " }
plex_languages = [ " default " , " ar-SA " , " ca-ES " , " cs-CZ " , " da-DK " , " de-DE " , " el-GR " , " en-AU " , " en-CA " , " en-GB " , " en-US " ,
" es-ES " , " es-MX " , " et-EE " , " fa-IR " , " fi-FI " , " fr-CA " , " fr-FR " , " he-IL " , " hi-IN " , " hu-HU " , " id-ID " ,
" it-IT " , " ja-JP " , " ko-KR " , " lt-LT " , " lv-LV " , " nb-NO " , " nl-NL " , " pl-PL " , " pt-BR " , " pt-PT " , " ro-RO " ,
" ru-RU " , " sk-SK " , " sv-SE " , " th-TH " , " tr-TR " , " uk-UA " , " vi-VN " , " zh-CN " , " zh-HK " , " zh-TW " ]
metadata_language_options = { lang . lower ( ) : lang for lang in plex_languages }
metadata_language_options [ " default " ] = None
use_original_title_options = { " default " : - 1 , " no " : 0 , " yes " : 1 }
collection_mode_options = {
" default " : " default " , " hide " : " hide " ,
" hide_items " : " hideItems " , " hideitems " : " hideItems " ,
" show_items " : " showItems " , " showitems " : " showItems "
}
collection_order_options = [ " release " , " alpha " , " custom " ]
collection_level_options = [ " episode " , " season " ]
collection_mode_keys = { - 1 : " default " , 0 : " hide " , 1 : " hideItems " , 2 : " showItems " }
collection_order_keys = { 0 : " release " , 1 : " alpha " , 2 : " custom " }
item_advance_keys = {
" item_episode_sorting " : ( " episodeSort " , episode_sorting_options ) ,
" item_keep_episodes " : ( " autoDeletionItemPolicyUnwatchedLibrary " , keep_episodes_options ) ,
" item_delete_episodes " : ( " autoDeletionItemPolicyWatchedLibrary " , delete_episodes_options ) ,
" item_season_display " : ( " flattenSeasons " , season_display_options ) ,
" item_episode_ordering " : ( " showOrdering " , episode_ordering_options ) ,
" item_metadata_language " : ( " languageOverride " , metadata_language_options ) ,
" item_use_original_title " : ( " useOriginalTitle " , use_original_title_options )
}
new_plex_agents = [ " tv.plex.agents.movie " , " tv.plex.agents.series " ]
searches = [
" title " , " title.not " , " title.is " , " title.isnot " , " title.begins " , " title.ends " ,
" studio " , " studio.not " , " studio.is " , " studio.isnot " , " studio.begins " , " studio.ends " ,
" actor " , " actor.not " ,
" audio_language " , " audio_language.not " ,
" collection " , " collection.not " ,
" content_rating " , " content_rating.not " ,
" country " , " country.not " ,
" director " , " director.not " ,
" genre " , " genre.not " ,
" label " , " label.not " ,
" network " , " network.not " ,
" producer " , " producer.not " ,
" subtitle_language " , " subtitle_language.not " ,
" writer " , " writer.not " ,
" decade " , " resolution " , " hdr " , " unmatched " , " duplicate " , " unplayed " , " progress " , " trash " ,
" last_played " , " last_played.not " , " last_played.before " , " last_played.after " ,
" added " , " added.not " , " added.before " , " added.after " ,
" release " , " release.not " , " release.before " , " release.after " ,
" duration.gt " , " duration.gte " , " duration.lt " , " duration.lte " ,
" plays.gt " , " plays.gte " , " plays.lt " , " plays.lte " ,
" user_rating.gt " , " user_rating.gte " , " user_rating.lt " , " user_rating.lte " ,
" critic_rating.gt " , " critic_rating.gte " , " critic_rating.lt " , " critic_rating.lte " ,
" audience_rating.gt " , " audience_rating.gte " , " audience_rating.lt " , " audience_rating.lte " ,
" year " , " year.not " , " year.gt " , " year.gte " , " year.lt " , " year.lte " ,
" unplayed_episodes " , " episode_unplayed " , " episode_duplicate " , " episode_progress " , " episode_unmatched " ,
" episode_title " , " episode_title.not " , " episode_title.is " , " episode_title.isnot " , " episode_title.begins " , " episode_title.ends " ,
" episode_added " , " episode_added.not " , " episode_added.before " , " episode_added.after " ,
" episode_air_date " , " episode_air_date.not " , " episode_air_date.before " , " episode_air_date.after " ,
" episode_last_played " , " episode_last_played.not " , " episode_last_played.before " , " episode_last_played.after " ,
" episode_plays.gt " , " episode_plays.gte " , " episode_plays.lt " , " episode_plays.lte " ,
" episode_user_rating.gt " , " episode_user_rating.gte " , " episode_user_rating.lt " , " episode_user_rating.lte " ,
" episode_year " , " episode_year.not " , " episode_year.gt " , " episode_year.gte " , " episode_year.lt " , " episode_year.lte "
]
and_searches = [
" title.and " , " studio.and " , " actor.and " , " audio_language.and " , " collection.and " ,
" content_rating.and " , " country.and " , " director.and " , " genre.and " , " label.and " ,
" network.and " , " producer.and " , " subtitle_language.and " , " writer.and "
]
or_searches = [
" title " , " studio " , " actor " , " audio_language " , " collection " , " content_rating " ,
" country " , " director " , " genre " , " label " , " network " , " producer " , " subtitle_language " ,
" writer " , " decade " , " resolution " , " year " , " episode_title " , " episode_year "
]
movie_only_searches = [
" country " , " country.not " , " director " , " director.not " , " producer " , " producer.not " , " writer " , " writer.not " ,
" decade " , " duplicate " , " unplayed " , " progress " ,
" duration.gt " , " duration.gte " , " duration.lt " , " duration.lte "
]
show_only_searches = [
" network " , " network.not " ,
" episode_title " , " episode_title.not " , " episode_title.is " , " episode_title.isnot " , " episode_title.begins " , " episode_title.ends " ,
" episode_added " , " episode_added.not " , " episode_added.before " , " episode_added.after " ,
" episode_air_date " , " episode_air_date.not " ,
" episode_air_date.before " , " episode_air_date.after " ,
" episode_last_played " , " episode_last_played.not " , " episode_last_played.before " , " episode_last_played.after " ,
" episode_plays.gt " , " episode_plays.gte " , " episode_plays.lt " , " episode_plays.lte " ,
" episode_user_rating.gt " , " episode_user_rating.gte " , " episode_user_rating.lt " , " episode_user_rating.lte " ,
" episode_year " , " episode_year.not " , " episode_year.gt " , " episode_year.gte " , " episode_year.lt " , " episode_year.lte " ,
" unplayed_episodes " , " episode_unplayed " , " episode_duplicate " , " episode_progress " , " episode_unmatched " ,
]
float_attributes = [ " user_rating " , " episode_user_rating " , " critic_rating " , " audience_rating " ]
boolean_attributes = [
" hdr " , " unmatched " , " duplicate " , " unplayed " , " progress " , " trash " ,
" unplayed_episodes " , " episode_unplayed " , " episode_duplicate " , " episode_progress " , " episode_unmatched " ,
]
tmdb_attributes = [ " actor " , " director " , " producer " , " writer " ]
date_attributes = [ " added " , " episode_added " , " release " , " episode_air_date " , " last_played " , " episode_last_played " , " first_episode_aired " , " last_episode_aired " ]
number_attributes = [ " plays " , " episode_plays " , " duration " , " tmdb_vote_count " ] + date_attributes
search_display = { " added " : " Date Added " , " release " : " Release Date " , " hdr " : " HDR " , " progress " : " In Progress " , " episode_progress " : " Episode In Progress " }
tags = [
" actor " , " audio_language " , " collection " , " content_rating " , " country " , " director " , " genre " , " label " ,
" network " , " producer " , " resolution " , " studio " , " subtitle_language " , " writer "
]
movie_sorts = {
" title.asc " : " titleSort " , " title.desc " : " titleSort % 3Adesc " ,
" year.asc " : " year " , " year.desc " : " year % 3Adesc " ,
" originally_available.asc " : " originallyAvailableAt " , " originally_available.desc " : " originallyAvailableAt % 3Adesc " ,
" release.asc " : " originallyAvailableAt " , " release.desc " : " originallyAvailableAt % 3Adesc " ,
" critic_rating.asc " : " rating " , " critic_rating.desc " : " rating % 3Adesc " ,
" audience_rating.asc " : " audienceRating " , " audience_rating.desc " : " audienceRating % 3Adesc " ,
" user_rating.asc " : " userRating " , " user_rating.desc " : " userRating % 3Adesc " ,
" content_rating.asc " : " contentRating " , " content_rating.desc " : " contentRating % 3Adesc " ,
" duration.asc " : " duration " , " duration.desc " : " duration % 3Adesc " ,
" plays.asc " : " viewCount " , " plays.desc " : " viewCount % 3Adesc " ,
" added.asc " : " addedAt " , " added.desc " : " addedAt % 3Adesc " ,
" random " : " random "
}
show_sorts = {
" title.asc " : " titleSort " , " title.desc " : " titleSort % 3Adesc " ,
" year.asc " : " year " , " year.desc " : " year % 3Adesc " ,
" originally_available.asc " : " originallyAvailableAt " , " originally_available.desc " : " originallyAvailableAt % 3Adesc " ,
" release.asc " : " originallyAvailableAt " , " release.desc " : " originallyAvailableAt % 3Adesc " ,
" critic_rating.asc " : " rating " , " critic_rating.desc " : " rating % 3Adesc " ,
" audience_rating.asc " : " audienceRating " , " audience_rating.desc " : " audienceRating % 3Adesc " ,
" user_rating.asc " : " userRating " , " user_rating.desc " : " userRating % 3Adesc " ,
" content_rating.asc " : " contentRating " , " content_rating.desc " : " contentRating % 3Adesc " ,
" added.asc " : " addedAt " , " added.desc " : " addedAt % 3Adesc " ,
" episode_added.asc " : " episode.addedAt " , " episode_added.desc " : " episode.addedAt % 3Adesc " ,
" random " : " random "
}
season_sorts = {
" season.asc " : " season.index % 2Cseason.titleSort " , " season.desc " : " season.index % 3Adesc % 2Cseason.titleSort " ,
" show.asc " : " show.titleSort % 2Cindex " , " show.desc " : " show.titleSort % 3Adesc % 2Cindex " ,
" user_rating.asc " : " userRating " , " user_rating.desc " : " userRating % 3Adesc " ,
" added.asc " : " addedAt " , " added.desc " : " addedAt % 3Adesc " ,
" random " : " random "
}
episode_sorts = {
" title.asc " : " titleSort " , " title.desc " : " titleSort % 3Adesc " ,
" show.asc " : " show.titleSort % 2Cseason.index % 3AnullsLast % 2Cepisode.index % 3AnullsLast % 2Cepisode.originallyAvailableAt % 3AnullsLast % 2Cepisode.titleSort % 2Cepisode.id " ,
" show.desc " : " show.titleSort % 3Adesc % 2Cseason.index % 3AnullsLast % 2Cepisode.index % 3AnullsLast % 2Cepisode.originallyAvailableAt % 3AnullsLast % 2Cepisode.titleSort % 2Cepisode.id " ,
" year.asc " : " year " , " year.desc " : " year % 3Adesc " ,
" originally_available.asc " : " originallyAvailableAt " , " originally_available.desc " : " originallyAvailableAt % 3Adesc " ,
" release.asc " : " originallyAvailableAt " , " release.desc " : " originallyAvailableAt % 3Adesc " ,
" critic_rating.asc " : " rating " , " critic_rating.desc " : " rating % 3Adesc " ,
" audience_rating.asc " : " audienceRating " , " audience_rating.desc " : " audienceRating % 3Adesc " ,
" user_rating.asc " : " userRating " , " user_rating.desc " : " userRating % 3Adesc " ,
" duration.asc " : " duration " , " duration.desc " : " duration % 3Adesc " ,
" plays.asc " : " viewCount " , " plays.desc " : " viewCount % 3Adesc " ,
" added.asc " : " addedAt " , " added.desc " : " addedAt % 3Adesc " ,
" random " : " random "
}
sort_types = { " movies " : ( 1 , movie_sorts ) , " shows " : ( 2 , show_sorts ) , " seasons " : ( 3 , season_sorts ) , " episodes " : ( 4 , episode_sorts ) }
class Plex ( Library ) :
def __init__ ( self , config , params ) :
super ( ) . __init__ ( config , params )
self . plex = params [ " plex " ]
self . url = params [ " plex " ] [ " url " ]
self . token = params [ " plex " ] [ " token " ]
self . timeout = params [ " plex " ] [ " timeout " ]
logger . info ( " " )
try :
self . PlexServer = PlexServer ( baseurl = self . url , token = self . token , session = self . config . session , timeout = self . timeout )
except Unauthorized :
raise Failed ( " Plex Error: Plex token is invalid " )
except ValueError as e :
raise Failed ( f " Plex Error: { e } " )
except ( requests . exceptions . ConnectionError , ParseError ) :
util . print_stacktrace ( )
raise Failed ( " Plex Error: Plex url is invalid " )
self . Plex = None
library_names = [ ]
for s in self . PlexServer . library . sections ( ) :
library_names . append ( s . title )
if s . title == params [ " name " ] :
self . Plex = s
break
if not self . Plex :
raise Failed ( f " Plex Error: Plex Library { params [ ' name ' ] } not found. Options: { library_names } " )
if self . Plex . type in [ " movie " , " show " ] :
self . type = self . Plex . type . capitalize ( )
else :
raise Failed ( f " Plex Error: Plex Library must be a Movies or TV Shows library " )
self . agent = self . Plex . agent
self . is_movie = self . type == " Movie "
self . is_show = self . type == " Show "
self . is_other = self . agent == " com.plexapp.agents.none "
if self . is_other :
self . type = " Video "
if self . tmdb_collections and self . is_show :
self . tmdb_collections = None
logger . error ( " Config Error: tmdb_collections only work with Movie Libraries. " )
def notify ( self , text , collection = None , critical = True ) :
self . config . notify ( text , server = self . PlexServer . friendlyName , library = self . name , collection = collection , critical = critical )
def set_server_preroll ( self , preroll ) :
self . PlexServer . settings . get ( ' cinemaTrailersPrerollID ' ) . set ( preroll )
self . PlexServer . settings . save ( )
def get_all_collections ( self ) :
return self . search ( libtype = " collection " )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def search ( self , title = None , libtype = None , sort = None , maxresults = None , * * kwargs ) :
return self . Plex . search ( title = title , sort = sort , maxresults = maxresults , libtype = libtype , * * kwargs )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def exact_search ( self , title , libtype = None , year = None ) :
if year :
terms = { " title= " : title , " year " : year }
else :
terms = { " title= " : title }
return self . Plex . search ( libtype = libtype , * * terms )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def get_labeled_items ( self , label ) :
return self . Plex . search ( label = label )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def fetchItem ( self , data ) :
return self . PlexServer . fetchItem ( data )
def get_all ( self ) :
logger . info ( f " Loading All { self . type } s from Library: { self . name } " )
key = f " /library/sections/ { self . Plex . key } /all?includeGuids=1&type= { utils . searchType ( self . Plex . TYPE ) } "
container_start = 0
container_size = plexapi . X_PLEX_CONTAINER_SIZE
results = [ ]
while self . Plex . _totalViewSize is None or container_start < = self . Plex . _totalViewSize :
results . extend ( self . fetchItems ( key , container_start , container_size ) )
util . print_return ( f " Loaded: { container_start } / { self . Plex . _totalViewSize } " )
container_start + = container_size
logger . info ( util . adjust_space ( f " Loaded { self . Plex . _totalViewSize } { self . type } s " ) )
return results
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def create_playlist ( self , name , items ) :
return self . PlexServer . createPlaylist ( name , items = items )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def fetchItems ( self , key , container_start , container_size ) :
return self . Plex . fetchItems ( key , container_start = container_start , container_size = container_size )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def moveItem ( self , obj , item , after ) :
obj . moveItem ( item , after = after )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def query ( self , method ) :
return method ( )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def query_data ( self , method , data ) :
return method ( data )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_failed )
def query_collection ( self , item , collection , locked = True , add = True ) :
if add :
item . addCollection ( collection , locked = locked )
else :
item . removeCollection ( collection , locked = locked )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def collection_mode_query ( self , collection , data ) :
collection . modeUpdate ( mode = data )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def collection_order_query ( self , collection , data ) :
collection . sortUpdate ( sort = data )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def reload ( self , item ) :
try :
item . reload ( checkFiles = False , includeAllConcerts = False , includeBandwidths = False , includeChapters = False ,
includeChildren = False , includeConcerts = False , includeExternalMedia = False , includeExtras = False ,
includeFields = False , includeGeolocation = False , includeLoudnessRamps = False , includeMarkers = False ,
includeOnDeck = False , includePopularLeaves = False , includeRelated = False ,
includeRelatedCount = 0 , includeReviews = False , includeStations = False )
except ( BadRequest , NotFound ) as e :
util . print_stacktrace ( )
raise Failed ( f " Item Failed to Load: { e } " )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def edit_query ( self , item , edits , advanced = False ) :
if advanced :
item . editAdvanced ( * * edits )
else :
item . edit ( * * edits )
self . reload ( item )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def _upload_image ( self , item , image ) :
try :
if image . is_poster and image . is_url :
item . uploadPoster ( url = image . location )
elif image . is_poster :
item . uploadPoster ( filepath = image . location )
elif image . is_url :
item . uploadArt ( url = image . location )
else :
item . uploadArt ( filepath = image . location )
self . reload ( item )
except BadRequest as e :
raise Failed ( e )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def upload_file_poster ( self , item , image ) :
item . uploadPoster ( filepath = image )
self . reload ( item )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_failed )
def get_search_choices ( self , search_name , title = True ) :
final_search = search_translation [ search_name ] if search_name in search_translation else search_name
final_search = show_translation [ final_search ] if self . is_show and final_search in show_translation else final_search
try :
names = [ ]
choices = { }
for choice in self . Plex . listFilterChoices ( final_search ) :
if choice . title not in names :
names . append ( choice . title )
if choice . key not in names :
names . append ( choice . key )
choices [ choice . title . lower ( ) ] = choice . title if title else choice . key
choices [ choice . key . lower ( ) ] = choice . title if title else choice . key
return choices , names
except NotFound :
logger . debug ( f " Search Attribute: { final_search } " )
raise Failed ( f " Plex Error: plex_search attribute: { search_name } not supported " )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def get_labels ( self ) :
return { label . title : label . key for label in self . Plex . listFilterChoices ( field = " label " ) }
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 , retry_on_exception = util . retry_if_not_plex )
def _query ( self , key , post = False , put = False ) :
if post : method = self . Plex . _server . _session . post
elif put : method = self . Plex . _server . _session . put
else : method = None
return self . Plex . _server . query ( key , method = method )
def alter_collection ( self , item , collection , smart_label_collection = False , add = True ) :
if smart_label_collection :
self . query_data ( item . addLabel if add else item . removeLabel , collection )
else :
locked = True
if self . agent in [ " tv.plex.agents.movie " , " tv.plex.agents.series " ] :
field = next ( ( f for f in item . fields if f . name == " collection " ) , None )
locked = field is not None
self . query_collection ( item , collection , locked = locked , add = add )
def move_item ( self , collection , item , after = None ) :
key = f " { collection . key } /items/ { item } /move "
if after :
key + = f " ?after= { after } "
self . _query ( key , put = True )
def smart_label_url ( self , title , sort ) :
labels = self . get_labels ( )
if title not in labels :
raise Failed ( f " Plex Error: Label: { title } does not exist " )
smart_type = 1 if self . is_movie else 2
sort_type = movie_sorts [ sort ] if self . is_movie else show_sorts [ sort ]
return smart_type , f " ?type= { smart_type } &sort= { sort_type } &label= { labels [ title ] } "
def test_smart_filter ( self , uri_args ) :
logger . debug ( f " Smart Collection Test: { uri_args } " )
test_items = self . get_filter_items ( uri_args )
if len ( test_items ) < 1 :
raise Failed ( f " Plex Error: No items for smart filter: { uri_args } " )
def create_smart_collection ( self , title , smart_type , uri_args ) :
self . test_smart_filter ( uri_args )
args = {
" type " : smart_type ,
" title " : title ,
" smart " : 1 ,
" sectionId " : self . Plex . key ,
" uri " : self . build_smart_filter ( uri_args )
}
self . _query ( f " /library/collections { utils . joinArgs ( args ) } " , post = True )
def get_smart_filter_from_uri ( self , uri ) :
smart_filter = parse . parse_qs ( parse . urlparse ( uri . replace ( " /#!/ " , " / " ) ) . query ) [ " key " ] [ 0 ]
args = smart_filter [ smart_filter . index ( " ? " ) : ]
return self . build_smart_filter ( args ) , int ( args [ args . index ( " type= " ) + 5 : args . index ( " type= " ) + 6 ] )
def build_smart_filter ( self , uri_args ) :
return f " server:// { self . PlexServer . machineIdentifier } /com.plexapp.plugins.library/library/sections/ { self . Plex . key } /all { uri_args } "
def update_smart_collection ( self , collection , uri_args ) :
self . test_smart_filter ( uri_args )
self . _query ( f " /library/collections/ { collection . ratingKey } /items { utils . joinArgs ( { ' uri ' : self . build_smart_filter ( uri_args ) } ) } " , put = True )
def smart_filter ( self , collection ) :
smart_filter = self . get_collection ( collection ) . content
return smart_filter [ smart_filter . index ( " ? " ) : ]
def collection_visibility ( self , collection ) :
try :
attrs = self . _query ( f " /hubs/sections/ { self . Plex . key } /manage?metadataItemId= { collection . ratingKey } " ) [ 0 ] . attrib
return {
" library " : utils . cast ( bool , attrs . get ( " promotedToRecommended " , " 0 " ) ) ,
" home " : utils . cast ( bool , attrs . get ( " promotedToOwnHome " , " 0 " ) ) ,
" shared " : utils . cast ( bool , attrs . get ( " promotedToSharedHome " , " 0 " ) )
}
except IndexError :
return { " library " : False , " home " : False , " shared " : False }
def collection_visibility_update ( self , collection , visibility = None , library = None , home = None , shared = None ) :
if visibility is None :
visibility = self . collection_visibility ( collection )
key = f " /hubs/sections/ { self . Plex . key } /manage?metadataItemId= { collection . ratingKey } "
key + = f " &promotedToRecommended= { 1 if ( library is None and visibility [ ' library ' ] ) or library else 0 } "
key + = f " &promotedToOwnHome= { 1 if ( home is None and visibility [ ' home ' ] ) or home else 0 } "
key + = f " &promotedToSharedHome= { 1 if ( shared is None and visibility [ ' shared ' ] ) or shared else 0 } "
self . _query ( key , post = True )
def get_playlist ( self , title ) :
try :
return self . PlexServer . playlist ( title )
except NotFound :
raise Failed ( f " Plex Error: Playlist { title } not found " )
def get_collection ( self , data ) :
if isinstance ( data , int ) :
return self . fetchItem ( data )
elif isinstance ( data , Collection ) :
return data
else :
for d in self . search ( title = str ( data ) , libtype = " collection " ) :
if d . title == data :
return d
raise Failed ( f " Plex Error: Collection { data } not found " )
def validate_collections ( self , collections ) :
valid_collections = [ ]
for collection in collections :
try : valid_collections . append ( self . get_collection ( collection ) )
except Failed as e : logger . error ( e )
if len ( valid_collections ) == 0 :
raise Failed ( f " Collection Error: No valid Plex Collections in { collections } " )
return valid_collections
def get_rating_keys ( self , method , data ) :
items = [ ]
if method == " plex_all " :
logger . info ( f " Processing Plex All { self . type } s " )
items = self . get_all ( )
elif method == " plex_search " :
util . print_multiline ( data [ 1 ] , info = True )
items = self . get_filter_items ( data [ 2 ] )
elif method == " plex_collectionless " :
good_collections = [ ]
logger . info ( f " Processing Plex Collectionless " )
logger . info ( " Collections Excluded " )
for col in self . get_all_collections ( ) :
keep_collection = True
for pre in data [ " exclude_prefix " ] :
if col . title . startswith ( pre ) or ( col . titleSort and col . titleSort . startswith ( pre ) ) :
keep_collection = False
logger . info ( f " { col . title } excluded by prefix match { pre } " )
break
if keep_collection :
for ext in data [ " exclude " ] :
if col . title == ext or ( col . titleSort and col . titleSort == ext ) :
keep_collection = False
logger . info ( f " { col . title } excluded by exact match " )
break
if keep_collection :
logger . info ( f " Collection Passed: { col . title } " )
good_collections . append ( col )
logger . info ( " " )
logger . info ( " Collections Not Excluded (Items in these collections are not added to Collectionless) " )
for col in good_collections :
logger . info ( col . title )
collection_indexes = [ c . index for c in good_collections ]
all_items = self . get_all ( )
for i , item in enumerate ( all_items , 1 ) :
util . print_return ( f " Processing: { i } / { len ( all_items ) } { item . title } " )
add_item = True
self . reload ( item )
for collection in item . collections :
if collection . id in collection_indexes :
add_item = False
break
if add_item :
items . append ( item )
logger . info ( util . adjust_space ( f " Processed { len ( all_items ) } { self . type } s " ) )
else :
raise Failed ( f " Plex Error: Method { method } not supported " )
if len ( items ) > 0 :
ids = [ ( item . ratingKey , " ratingKey " ) for item in items ]
logger . debug ( " " )
logger . debug ( f " { len ( ids ) } Keys Found: { ids } " )
return ids
else :
raise Failed ( " Plex Error: No Items found in Plex " )
def get_collection_items ( self , collection , smart_label_collection ) :
if smart_label_collection :
return self . get_labeled_items ( collection . title if isinstance ( collection , Collection ) else str ( collection ) )
elif isinstance ( collection , ( Collection , Playlist ) ) :
if collection . smart :
return self . get_filter_items ( self . smart_filter ( collection ) )
else :
return self . query ( collection . items )
else :
return [ ]
def get_filter_items ( self , uri_args ) :
key = f " /library/sections/ { self . Plex . key } /all { uri_args } "
return self . Plex . _search ( key , None , 0 , plexapi . X_PLEX_CONTAINER_SIZE )
def get_collection_name_and_items ( self , collection , smart_label_collection ) :
name = collection . title if isinstance ( collection , ( Collection , Playlist ) ) else str ( collection )
return name , self . get_collection_items ( collection , smart_label_collection )
def get_tmdb_from_map ( self , item ) :
return self . movie_rating_key_map [ item . ratingKey ] if item . ratingKey in self . movie_rating_key_map else None
def get_tvdb_from_map ( self , item ) :
return self . show_rating_key_map [ item . ratingKey ] if item . ratingKey in self . show_rating_key_map else None
def search_item ( self , data , year = None ) :
kwargs = { }
if year is not None :
kwargs [ " year " ] = year
for d in self . search ( title = str ( data ) , * * kwargs ) :
if d . title == data :
return d
return None
def edit_item ( self , item , name , item_type , edits , advanced = False ) :
if len ( edits ) > 0 :
logger . debug ( f " Details Update: { edits } " )
try :
self . edit_query ( item , edits , advanced = advanced )
if advanced and ( " languageOverride " in edits or " useOriginalTitle " in edits ) :
self . query ( item . refresh )
logger . info ( f " { item_type } : { name } { ' Advanced ' if advanced else ' ' } Details Update Successful " )
return True
except BadRequest :
util . print_stacktrace ( )
logger . error ( f " { item_type } : { name } { ' Advanced ' if advanced else ' ' } Details Update Failed " )
return False
def edit_tags ( self , attr , obj , add_tags = None , remove_tags = None , sync_tags = None ) :
display = " "
key = builder . filter_translation [ attr ] if attr in builder . filter_translation else attr
if add_tags or remove_tags or sync_tags is not None :
_add_tags = add_tags if add_tags else [ ]
_remove_tags = [ t . lower ( ) for t in remove_tags ] if remove_tags else [ ]
_sync_tags = [ t . lower ( ) for t in sync_tags ] if sync_tags else [ ]
try :
self . reload ( obj )
_item_tags = [ item_tag . tag . lower ( ) for item_tag in getattr ( obj , key ) ]
except BadRequest :
_item_tags = [ ]
_add = [ f " { t [ : 1 ] . upper ( ) } { t [ 1 : ] } " for t in _add_tags + _sync_tags if t . lower ( ) not in _item_tags ]
_remove = [ t for t in _item_tags if ( sync_tags is not None and t not in _sync_tags ) or t in _remove_tags ]
if _add :
self . query_data ( getattr ( obj , f " add { attr . capitalize ( ) } " ) , _add )
display + = f " + { ' , + ' . join ( _add ) } "
if _remove :
self . query_data ( getattr ( obj , f " remove { attr . capitalize ( ) } " ) , _remove )
display + = f " - { ' , - ' . join ( _remove ) } "
if len ( display ) > 0 :
logger . info ( f " { obj . title [ : 25 ] : <25 } | { attr . capitalize ( ) } | { display } " )
return len ( display ) > 0
def find_assets ( self , item , name = None , upload = True ) :
if isinstance ( item , Movie ) :
name = os . path . basename ( os . path . dirname ( str ( item . locations [ 0 ] ) ) )
elif isinstance ( item , Show ) :
name = os . path . basename ( str ( item . locations [ 0 ] ) )
elif isinstance ( item , Collection ) :
name = name if name else item . title
else :
return None , None
found_folder = False
poster = None
background = None
for ad in self . asset_directory :
item_dir = None
if self . asset_folders :
if os . path . isdir ( os . path . join ( ad , name ) ) :
item_dir = os . path . join ( ad , name )
else :
for n in range ( 1 , self . asset_depth + 1 ) :
new_path = ad
for i in range ( 1 , n + 1 ) :
new_path = os . path . join ( new_path , " * " )
matches = util . glob_filter ( os . path . join ( new_path , name ) )
if len ( matches ) > 0 :
item_dir = os . path . abspath ( matches [ 0 ] )
break
if item_dir is None :
continue
found_folder = True
poster_filter = os . path . join ( item_dir , " poster.* " )
background_filter = os . path . join ( item_dir , " background.* " )
else :
poster_filter = os . path . join ( ad , f " { name } .* " )
background_filter = os . path . join ( ad , f " { name } _background.* " )
poster_matches = util . glob_filter ( poster_filter )
if len ( poster_matches ) > 0 :
poster = ImageData ( " asset_directory " , os . path . abspath ( poster_matches [ 0 ] ) , prefix = f " { item . title } ' s " , is_url = False )
background_matches = util . glob_filter ( background_filter )
if len ( background_matches ) > 0 :
background = ImageData ( " asset_directory " , os . path . abspath ( background_matches [ 0 ] ) , prefix = f " { item . title } ' s " , is_poster = False , is_url = False )
if item_dir and self . dimensional_asset_rename and ( not poster or not background ) :
for file in util . glob_filter ( os . path . join ( item_dir , " *.* " ) ) :
if file . lower ( ) . endswith ( ( " .jpg " , " .png " , " .jpeg " ) ) :
image = Image . open ( poster )
_w , _h = image . size
image . close ( )
if not poster and _h > _w :
new_path = os . path . join ( os . path . dirname ( file ) , f " poster { os . path . splitext ( file ) [ 1 ] . lower ( ) } " )
os . rename ( file , new_path )
poster = ImageData ( " asset_directory " , os . path . abspath ( new_path ) , prefix = f " { item . title } ' s " , is_url = False )
elif not background and _w > _h :
new_path = os . path . join ( os . path . dirname ( file ) , f " background { os . path . splitext ( file ) [ 1 ] . lower ( ) } " )
os . rename ( file , new_path )
background = ImageData ( " asset_directory " , os . path . abspath ( new_path ) , prefix = f " { item . title } ' s " , is_poster = False , is_url = False )
if poster and background :
break
if poster or background :
if upload :
self . upload_images ( item , poster = poster , background = background , overlay = name )
else :
return poster , background
if isinstance ( item , Show ) :
missing_assets = " "
found_season = False
for season in self . query ( item . seasons ) :
season_name = f " Season { ' 0 ' if season . seasonNumber < 10 else ' ' } { season . seasonNumber } "
if item_dir :
season_poster_filter = os . path . join ( item_dir , f " { season_name } .* " )
season_background_filter = os . path . join ( item_dir , f " { season_name } _background.* " )
else :
season_poster_filter = os . path . join ( ad , f " { name } _ { season_name } .* " )
season_background_filter = os . path . join ( ad , f " { name } _ { season_name } _background.* " )
season_poster = None
season_background = None
matches = util . glob_filter ( season_poster_filter )
if len ( matches ) > 0 :
season_poster = ImageData ( " asset_directory " , os . path . abspath ( matches [ 0 ] ) , prefix = f " { item . title } Season { season . seasonNumber } ' s " , is_url = False )
found_season = True
elif season . seasonNumber > 0 :
missing_assets + = f " \n Missing Season { season . seasonNumber } Poster "
matches = util . glob_filter ( season_background_filter )
if len ( matches ) > 0 :
season_background = ImageData ( " asset_directory " , os . path . abspath ( matches [ 0 ] ) , prefix = f " { item . title } Season { season . seasonNumber } ' s " , is_poster = False , is_url = False )
if season_poster or season_background :
self . upload_images ( season , poster = season_poster , background = season_background )
for episode in self . query ( season . episodes ) :
if item_dir :
episode_filter = os . path . join ( item_dir , f " { episode . seasonEpisode . upper ( ) } .* " )
else :
episode_filter = os . path . join ( ad , f " { name } _ { episode . seasonEpisode . upper ( ) } .* " )
matches = util . glob_filter ( episode_filter )
if len ( matches ) > 0 :
episode_poster = ImageData ( " asset_directory " , os . path . abspath ( matches [ 0 ] ) , prefix = f " { item . title } { episode . seasonEpisode . upper ( ) } ' s " , is_url = False )
self . upload_images ( episode , poster = episode_poster )
if self . show_missing_season_assets and found_season and missing_assets :
util . print_multiline ( f " Missing Season Posters for { item . title } { missing_assets } " , info = True )
if isinstance ( item , ( Movie , Show ) ) and not poster and name :
self . upload_images ( item , overlay = name )
if self . create_asset_folders and self . asset_folders and not found_folder :
os . makedirs ( os . path . join ( self . asset_directory [ 0 ] , name ) , exist_ok = True )
logger . info ( f " Asset Directory Created: { os . path . join ( self . asset_directory [ 0 ] , name ) } " )
elif isinstance ( item , ( Movie , Show ) ) and not name and self . asset_folders and not found_folder :
logger . error ( f " Asset Warning: No asset folder found called ' { name } ' " )
elif isinstance ( item , ( Movie , Show ) ) and not poster and not background and self . show_missing_assets :
logger . error ( f " Asset Warning: No poster or background found in an assets folder for ' { name } ' " )
return None , None