import logging , math , re , time
from modules import util
from modules . util import Failed
from urllib . parse import urlparse , parse_qs
logger = logging . getLogger ( " Plex Meta Manager " )
builders = [ " imdb_list " , " imdb_id " ]
base_url = " https://www.imdb.com "
urls = {
" list " : f " { base_url } /list/ls " ,
" search " : f " { base_url } /search/title/ " ,
" keyword " : f " { base_url } /search/keyword/ "
}
xpath = {
" imdb_id " : " //div[contains(@class, ' lister-item-image ' )]//a/img//@data-tconst " ,
" list " : " //div[@class= ' desc lister-total-num-results ' ]/text() " ,
" search " : " //div[@class= ' desc ' ]/span/text() " ,
" keyword " : " //div[@class= ' desc ' ]/text() "
}
item_counts = { " list " : 100 , " search " : 250 , " keyword " : 50 }
class IMDb :
def __init__ ( self , config ) :
self . config = config
def validate_imdb_lists ( self , imdb_lists , language ) :
valid_lists = [ ]
for imdb_dict in util . get_list ( imdb_lists , split = False ) :
if not isinstance ( imdb_dict , dict ) :
imdb_dict = { " url " : imdb_dict }
dict_methods = { dm . lower ( ) : dm for dm in imdb_dict }
imdb_url = util . parse ( " url " , imdb_dict , methods = dict_methods , parent = " imdb_list " ) . strip ( )
if not imdb_url . startswith ( ( urls [ " list " ] , urls [ " search " ] , urls [ " keyword " ] ) ) :
raise Failed ( f " IMDb Error: { imdb_url } must begin with either: \n { urls [ ' list ' ] } (For Lists) \n { urls [ ' search ' ] } (For Searches) \n { urls [ ' keyword ' ] } (For Keyword Searches) " )
self . _total ( imdb_url , language )
list_count = util . parse ( " limit " , imdb_dict , datatype = " int " , methods = dict_methods , default = 0 , parent = " imdb_list " , minimum = 0 ) if " limit " in dict_methods else 0
valid_lists . append ( { " url " : imdb_url , " limit " : list_count } )
return valid_lists
def _total ( self , imdb_url , language ) :
headers = util . header ( language )
if imdb_url . startswith ( urls [ " keyword " ] ) :
page_type = " keyword "
elif imdb_url . startswith ( urls [ " list " ] ) :
page_type = " list "
else :
page_type = " search "
results = self . config . get_html ( imdb_url , headers = headers ) . xpath ( xpath [ page_type ] )
total = 0
for result in results :
if " title " in result :
try :
total = int ( re . findall ( " ( \\ d+) title " , result . replace ( " , " , " " ) ) [ 0 ] )
break
except IndexError :
pass
if total > 0 :
return total , item_counts [ page_type ]
raise ValueError ( f " IMDb Error: Failed to parse URL: { imdb_url } " )
def _ids_from_url ( self , imdb_url , language , limit ) :
total , item_count = self . _total ( imdb_url , language )
headers = util . header ( language )
imdb_ids = [ ]
parsed_url = urlparse ( imdb_url )
params = parse_qs ( parsed_url . query )
imdb_base = parsed_url . _replace ( query = None ) . geturl ( )
params . pop ( " start " , None )
params . pop ( " count " , None )
params . pop ( " page " , None )
if limit < 1 or total < limit :
limit = total
remainder = limit % item_count
if remainder == 0 :
remainder = item_count
num_of_pages = math . ceil ( int ( limit ) / item_count )
for i in range ( 1 , num_of_pages + 1 ) :
start_num = ( i - 1 ) * item_count + 1
util . print_return ( f " Parsing Page { i } / { num_of_pages } { start_num } - { limit if i == num_of_pages else i * item_count } " )
if imdb_base . startswith ( ( urls [ " list " ] , urls [ " keyword " ] ) ) :
params [ " page " ] = i
else :
params [ " count " ] = remainder if i == num_of_pages else item_count
params [ " start " ] = start_num
ids_found = self . config . get_html ( imdb_base , headers = headers , params = params ) . xpath ( xpath [ " imdb_id " ] )
if imdb_base . startswith ( ( urls [ " list " ] , urls [ " keyword " ] ) ) and i == num_of_pages :
ids_found = ids_found [ : remainder ]
imdb_ids . extend ( ids_found )
time . sleep ( 2 )
util . print_end ( )
if len ( imdb_ids ) > 0 :
logger . debug ( f " { len ( imdb_ids ) } IMDb IDs Found: { imdb_ids } " )
return imdb_ids
raise ValueError ( f " IMDb Error: No IMDb IDs Found at { imdb_url } " )
def get_imdb_ids ( self , method , data , language ) :
if method == " imdb_id " :
logger . info ( f " Processing IMDb ID: { data } " )
return [ ( data , " imdb " ) ]
elif method == " imdb_list " :
status = f " { data [ ' limit ' ] } Items at " if data [ ' limit ' ] > 0 else ' '
logger . info ( f " Processing IMDb List: { status } { data [ ' url ' ] } " )
return [ ( i , " imdb " ) for i in self . _ids_from_url ( data [ " url " ] , language , data [ " limit " ] ) ]
else :
raise Failed ( f " IMDb Error: Method { method } not supported " )