import logging , math , re , requests
from lxml import html
from modules import util
from modules . util import Failed
from retrying import retry
logger = logging . getLogger ( " Plex Meta Manager " )
class IMDbAPI :
def __init__ ( self , Cache = None , TMDb = None , Trakt = None , TVDb = None ) :
if TMDb is None and Trakt is None :
raise Failed ( " IMDb Error: IMDb requires either TMDb or Trakt " )
self . Cache = Cache
self . TMDb = TMDb
self . Trakt = Trakt
self . TVDb = TVDb
def get_imdb_ids_from_url ( self , imdb_url , language , limit ) :
imdb_url = imdb_url . strip ( )
if not imdb_url . startswith ( " https://www.imdb.com/list/ls " ) and not imdb_url . startswith ( " https://www.imdb.com/search/title/? " ) :
raise Failed ( f " IMDb Error: { imdb_url } must begin with either: \n | https://www.imdb.com/list/ls (For Lists) \n | https://www.imdb.com/search/title/? (For Searches) " )
if imdb_url . startswith ( " https://www.imdb.com/list/ls " ) :
try : list_id = re . search ( " ( \\ d+) " , str ( imdb_url ) ) . group ( 1 )
except AttributeError : raise Failed ( f " IMDb Error: Failed to parse List ID from { imdb_url } " )
current_url = f " https://www.imdb.com/search/title/?lists=ls { list_id } "
else :
current_url = imdb_url
header = { " Accept-Language " : language }
length = 0
imdb_ids = [ ]
try : results = self . send_request ( current_url , header ) . xpath ( " //div[@class= ' desc ' ]/span/text() " ) [ 0 ] . replace ( " , " , " " )
except IndexError : raise Failed ( f " IMDb Error: Failed to parse URL: { imdb_url } " )
try : total = int ( re . findall ( " ( \\ d+) title " , results ) [ 0 ] )
except IndexError : raise Failed ( f " IMDb Error: No Results at URL: { imdb_url } " )
if " &start= " in current_url : current_url = re . sub ( " &start= \\ d+ " , " " , current_url )
if " &count= " in current_url : current_url = re . sub ( " &count= \\ d+ " , " " , current_url )
if limit < 1 or total < limit : limit = total
remainder = limit % 250
if remainder == 0 : remainder = 250
num_of_pages = math . ceil ( int ( limit ) / 250 )
for i in range ( 1 , num_of_pages + 1 ) :
start_num = ( i - 1 ) * 250 + 1
length = util . print_return ( length , f " Parsing Page { i } / { num_of_pages } { start_num } - { limit if i == num_of_pages else i * 250 } " )
response = self . send_request ( f " { current_url } &count= { remainder if i == num_of_pages else 250 } &start= { start_num } " , header )
imdb_ids . extend ( response . xpath ( " //div[contains(@class, ' lister-item-image ' )]//a/img//@data-tconst " ) )
util . print_end ( length )
if imdb_ids : return imdb_ids
else : raise Failed ( f " IMDb Error: No Movies Found at { imdb_url } " )
@retry ( stop_max_attempt_number = 6 , wait_fixed = 10000 )
def send_request ( self , url , header ) :
return html . fromstring ( requests . get ( url , headers = header ) . content )
def get_items ( self , method , data , language , status_message = True ) :
pretty = util . pretty_names [ method ] if method in util . pretty_names else method
if status_message :
logger . debug ( f " Data: { data } " )
show_ids = [ ]
movie_ids = [ ]
if method == " imdb_id " :
if status_message :
logger . info ( f " Processing { pretty } : { data } " )
tmdb_id , tvdb_id = self . convert_from_imdb ( data , language )
if tmdb_id : movie_ids . append ( tmdb_id )
if tvdb_id : show_ids . append ( tvdb_id )
elif method == " imdb_list " :
if status_message :
status = f " { data [ ' limit ' ] } Items at " if data [ ' limit ' ] > 0 else ' '
logger . info ( f " Processing { pretty } : { status } { data [ ' url ' ] } " )
imdb_ids = self . get_imdb_ids_from_url ( data [ " url " ] , language , data [ " limit " ] )
total_ids = len ( imdb_ids )
length = 0
for i , imdb_id in enumerate ( imdb_ids , 1 ) :
length = util . print_return ( length , f " Converting IMDb ID { i } / { total_ids } " )
try :
tmdb_id , tvdb_id = self . convert_from_imdb ( imdb_id , language )
if tmdb_id : movie_ids . append ( tmdb_id )
if tvdb_id : show_ids . append ( tvdb_id )
except Failed as e : logger . warning ( e )
util . print_end ( length , f " Processed { total_ids } IMDb IDs " )
else :
raise Failed ( f " IMDb Error: Method { method } not supported " )
if status_message :
logger . debug ( f " TMDb IDs Found: { movie_ids } " )
logger . debug ( f " TVDb IDs Found: { show_ids } " )
return movie_ids , show_ids
def convert_from_imdb ( self , imdb_id , language ) :
update_tmdb = False
update_tvdb = False
if self . Cache :
tmdb_id , tvdb_id = self . Cache . get_ids_from_imdb ( imdb_id )
update_tmdb = False
if not tmdb_id :
tmdb_id , update_tmdb = self . Cache . get_tmdb_from_imdb ( imdb_id )
if update_tmdb :
tmdb_id = None
update_tvdb = False
if not tvdb_id :
tvdb_id , update_tvdb = self . Cache . get_tvdb_from_imdb ( imdb_id )
if update_tvdb :
tvdb_id = None
else :
tmdb_id = None
tvdb_id = None
from_cache = tmdb_id is not None or tvdb_id is not None
if not tmdb_id and not tvdb_id and self . TMDb :
try : tmdb_id = self . TMDb . convert_imdb_to_tmdb ( imdb_id )
except Failed : pass
if not tmdb_id and not tvdb_id and self . TMDb :
try : tvdb_id = self . TMDb . convert_imdb_to_tvdb ( imdb_id )
except Failed : pass
if not tmdb_id and not tvdb_id and self . Trakt :
try : tmdb_id = self . Trakt . convert_imdb_to_tmdb ( imdb_id )
except Failed : pass
if not tmdb_id and not tvdb_id and self . Trakt :
try : tvdb_id = self . Trakt . convert_imdb_to_tvdb ( imdb_id )
except Failed : pass
try :
if tmdb_id and not from_cache : self . TMDb . get_movie ( tmdb_id )
except Failed : tmdb_id = None
try :
if tvdb_id and not from_cache : self . TVDb . get_series ( language , tvdb_id = tvdb_id )
except Failed : tvdb_id = None
if not tmdb_id and not tvdb_id : raise Failed ( f " IMDb Error: No TMDb ID or TVDb ID found for IMDb: { imdb_id } " )
if self . Cache :
if tmdb_id and update_tmdb is not False :
self . Cache . update_imdb ( " movie " , update_tmdb , imdb_id , tmdb_id )
if tvdb_id and update_tvdb is not False :
self . Cache . update_imdb ( " show " , update_tvdb , imdb_id , tvdb_id )
return tmdb_id , tvdb_id