@ -1,6 +1,5 @@
import csv , gzip , json , math , os , re , shutil , time
import csv , gzip , json , math , os , re , shutil
from modules import util
from modules . request import parse_qs , urlparse
from modules . util import Failed
logger = util . logger
@ -127,6 +126,7 @@ base_url = "https://www.imdb.com"
git_base = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Awards/master "
search_hash_url = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Hash/master/HASH "
list_hash_url = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Hash/master/LIST_HASH "
watchlist_hash_url = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Hash/master/WATCHLIST_HASH "
graphql_url = " https://api.graphql.imdb.com/ "
list_url = f " { base_url } /list/ls "
@ -142,6 +142,7 @@ class IMDb:
self . _events = { }
self . _search_hash = None
self . _list_hash = None
self . _watchlist_hash = None
self . event_url_validation = { }
def _request ( self , url , language = None , xpath = None , params = None ) :
@ -166,6 +167,12 @@ class IMDb:
self . _list_hash = self . requests . get ( list_hash_url ) . text . strip ( )
return self . _list_hash
@property
def watchlist_hash ( self ) :
if self . _watchlist_hash is None :
self . _watchlist_hash = self . requests . get ( watchlist_hash_url ) . text . strip ( )
return self . _watchlist_hash
@property
def events_validation ( self ) :
if self . _events_validation is None :
@ -177,32 +184,44 @@ class IMDb:
self . _events [ event_id ] = self . requests . get_yaml ( f " { git_base } /events/ { event_id } .yml " ) . data
return self . _events [ event_id ]
def validate_imdb _lists ( self , err_type , imdb_lis ts) :
def validate_imdb ( self , err_type , method, imdb_dic ts) :
valid_lists = [ ]
for imdb_dict in util . get_list ( imdb_lists , split = False ) :
main = " list_id " if method == " imdb_list " else " user_id "
for imdb_dict in util . get_list ( imdb_dicts , split = True if isinstance ( imdb_dicts , str ) else False ) :
if not isinstance ( imdb_dict , dict ) :
imdb_dict = { " list_id " : imdb_dict }
if " url " in imdb_dict and " list_id " not in imdb_dict :
imdb_dict [ " list_id " ] = imdb_dict [ " url " ]
imdb_dict = { main : imdb_dict }
if " url " in imdb_dict and main not in imdb_dict :
imdb_dict [ main ] = imdb_dict [ " url " ]
dict_methods = { dm . lower ( ) : dm for dm in imdb_dict }
if " list_id " not in dict_methods :
raise Failed ( f " { err_type } Error: imdb_list list_id attribute not found" )
elif imdb_dict [ dict_methods [ " list_id " ] ] is None :
raise Failed ( f " { err_type } Error: imdb_list list_id attribute is blank" )
if main not in dict_methods :
raise Failed ( f " { err_type } Error: { method } { main } attribute not found" )
elif imdb_dict [ dict_methods [ main ] ] is None :
raise Failed ( f " { err_type } Error: { method } { main } attribute is blank" )
else :
imdb_url = imdb_dict [ dict_methods [ " list_id " ] ] . strip ( )
if imdb_url . startswith ( f " { base_url } /search/ " ) :
raise Failed ( " IMDb Error: URLs with https://www.imdb.com/search/ no longer works with imdb_list use imdb_search. " )
if imdb_url . startswith ( f " { base_url } /filmosearch/ " ) :
raise Failed ( " IMDb Error: URLs with https://www.imdb.com/filmosearch/ no longer works with imdb_list use imdb_search. " )
search = re . search ( r " (ls \ d+) " , imdb_url )
main_data = imdb_dict [ dict_methods [ main ] ] . strip ( )
if method == " imdb_list " :
if main_data . startswith ( f " { base_url } /search/ " ) :
raise Failed ( f " IMDb Error: URLs with https://www.imdb.com/search/ no longer works with { method } use imdb_search. " )
if main_data . startswith ( f " { base_url } /filmosearch/ " ) :
raise Failed ( f " IMDb Error: URLs with https://www.imdb.com/filmosearch/ no longer works with { method } use imdb_search. " )
search = re . search ( r " (ls \ d+) " , main_data )
if not search :
raise Failed ( " IMDb Error: imdb_list list_id must begin with ls (ex. ls005526372) " )
new_dict = { " list_id " : search . group ( 1 ) }
raise Failed ( f " IMDb Error: { method } { main } must begin with ls (ex. ls005526372) " )
new_dict = { main : search . group ( 1 ) }
else :
user_id = None
if main_data . startswith ( " ur " ) :
try :
user_id = int ( main_data [ 2 : ] )
except ValueError :
pass
if not user_id :
raise Failed ( f " { err_type } Error: { method } { main } : { main_data } not in the format of ' ur######## ' " )
new_dict = { main : main_data }
if " limit " in dict_methods :
if imdb_dict [ dict_methods [ " limit " ] ] is None :
logger . warning ( f " { err_type } Warning: imdb_list limit attribute is blank using 0 as default " )
logger . warning ( f " { err_type } Warning: { method } limit attribute is blank using 0 as default" )
else :
try :
value = int ( str ( imdb_dict [ dict_methods [ " limit " ] ] ) )
@ -211,31 +230,16 @@ class IMDb:
except ValueError :
pass
if " limit " not in new_dict :
logger . warning ( f " { err_type } Warning: imdb_list limit attribute: { imdb_dict [ dict_methods [ ' limit ' ] ] } must be an integer 0 or greater using 0 as default " )
logger . warning ( f " { err_type } Warning: { method } limit attribute: { imdb_dict [ dict_methods [ ' limit ' ] ] } must be an integer 0 or greater using 0 as default " )
if " limit " not in new_dict :
new_dict [ " limit " ] = 0
if " sort_by " in dict_methods :
new_dict [ " sort_by " ] = util . parse ( err_type , dict_methods , imdb_dict , parent = " imdb_list " , default = " custom.asc " , options = list_sort_options )
new_dict [ " sort_by " ] = util . parse ( err_type , dict_methods , imdb_dict , parent = method , default = " custom.asc " , options = list_sort_options )
valid_lists . append ( new_dict )
return valid_lists
def validate_imdb_watchlists ( self , err_type , users , language ) :
valid_users = [ ]
for user in util . get_list ( users ) :
user_id = None
if user . startswith ( " ur " ) :
try :
user_id = int ( user [ 2 : ] )
except ValueError :
pass
if not user_id :
raise Failed ( f " { err_type } Error: User { user } not in the format of ' ur######## ' " )
if self . _watchlist ( user , language ) :
valid_users . append ( user )
return valid_users
def get_event_years ( self , event_id ) :
if event_id in self . events_validation :
return True , self . events_validation [ event_id ] [ " years " ]
@ -263,16 +267,16 @@ class IMDb:
break
return award_names , category_names
def _ watchlist( self , user , languag e) :
imdb_url = f " { base_url } /user/ { user } /watchlist "
for text in self . _request ( imdb_url , language = language , xpath = " //div[@class= ' article ' ]/script/text() " ) [ 0 ] . split ( " \n " ) :
if text . strip ( ) . startswith ( " IMDbReactInitialState.push " ) :
jsonline = text . strip ( )
r eturn [ f for f in json . load s( jsonlin e[jsonline . find ( ' { ' ) :- 2 ] ) [ " starbars " ] ]
raise Failed ( f " IMDb Error: Failed to parse URL: { imdb_url } " )
def _ json_operation( self , list_typ e) :
if list_type == " search " :
return " AdvancedTitleSearch " , self . search_hash
elif list_type == " list " :
return " TitleListMainPage " , self . list_hash
else :
return " WatchListPageRefiner " , self . watchlist_hash
def _graphql_json ( self , data , search= Tru e) :
page_limit = 250 if search else 100
def _graphql_json ( self , data , list_typ e) :
page_limit = 250 if list_type == " search" else 100
out = {
" locale " : " en-US " ,
" first " : data [ " limit " ] if " limit " in data and 0 < data [ " limit " ] < page_limit else page_limit ,
@ -302,10 +306,10 @@ class IMDb:
if range_data :
out [ constraint ] [ range_name [ i ] ] = range_data
sort = data [ " sort_by " ] if " sort_by " in data else " popularity.asc " if search else " custom.asc "
sort = data [ " sort_by " ] if " sort_by " in data else " popularity.asc " if list_type == " search" else " custom.asc "
sort_by , sort_order = sort . split ( " . " )
if search:
if list_type == " search" :
out [ " titleTypeConstraint " ] = { " anyTitleTypeIds " : [ title_type_options [ t ] for t in data [ " type " ] ] if " type " in data else [ ] }
out [ " sortBy " ] = sort_by_options [ sort_by ]
out [ " sortOrder " ] = sort_order . upper ( )
@ -373,24 +377,26 @@ class IMDb:
if " adult " in data and data [ " adult " ] :
out [ " explicitContentConstraint " ] = { " explicitContentFilter " : " INCLUDE_ADULT " }
else :
if list_type == " list " :
out [ " lsConst " ] = data [ " list_id " ]
else :
out [ " urConst " ] = data [ " user_id " ]
out [ " sort " ] = { " by " : list_sort_by_options [ sort_by ] , " order " : sort_order . upper ( ) }
logger . trace ( out )
return {
" operationName " : " AdvancedTitleSearch " if search else " TitleListMainPage " ,
" variables " : out ,
" extensions " : { " persistedQuery " : { " version " : 1 , " sha256Hash " : self . search_hash if search else self . list_hash } }
}
op , sha = self . _json_operation ( list_type )
return { " operationName " : op , " variables " : out , " extensions " : { " persistedQuery " : { " version " : 1 , " sha256Hash " : sha } } }
def _pagination ( self , data , search = True ) :
json_obj = self . _graphql_json ( data , search = search )
item_count = 250 if search else 100
def _pagination ( self , data , list_type ) :
is_list = list_type != " search "
json_obj = self . _graphql_json ( data , list_type )
item_count = 100 if is_list else 250
imdb_ids = [ ]
logger . ghost ( " Parsing Page 1 " )
response_json = self . _graph_request ( json_obj )
try :
search_data = response_json [ " data " ] [ " advancedTitleSearch " ] if search else response_json [ " data " ] [ " list " ] [ " titleListItemSearch " ]
step = " list " if list_type == " list " else " predefinedList "
search_data = response_json [ " data " ] [ step ] [ " titleListItemSearch " ] if is_list else response_json [ " data " ] [ " advancedTitleSearch " ]
total = search_data [ " total " ]
limit = data [ " limit " ]
if limit < 1 or total < limit :
@ -400,16 +406,16 @@ class IMDb:
remainder = item_count
num_of_pages = math . ceil ( int ( limit ) / item_count )
end_cursor = search_data [ " pageInfo " ] [ " endCursor " ]
imdb_ids . extend ( [ n [ " node" ] [ " title " ] [ " id " ] if search else n [ " listItem " ] [ " id " ] for n in search_data [ " edges " ] ] )
imdb_ids . extend ( [ n [ " listItem " ] [ " id " ] if is_list else n [ " node " ] [ " title " ] [ " id " ] for n in search_data [ " edges " ] ] )
if num_of_pages > 1 :
for i in range ( 2 , num_of_pages + 1 ) :
start_num = ( i - 1 ) * item_count + 1
logger . ghost ( f " Parsing Page { i } / { num_of_pages } { start_num } - { limit if i == num_of_pages else i * item_count } " )
json_obj [ " variables " ] [ " after " ] = end_cursor
response_json = self . _graph_request ( json_obj )
search_data = response_json [ " data " ] [ " advancedTitleSearch " ] if search else response_json [ " data " ] [ " list" ] [ " titleListItem Search" ]
search_data = response_json [ " data " ] [ step ] [ " titleListItemSearch " ] if is_list else response_json [ " data " ] [ " advancedTitle Search" ]
end_cursor = search_data [ " pageInfo " ] [ " endCursor " ]
ids_found = [ n [ " node" ] [ " title " ] [ " id " ] if search else n [ " listItem " ] [ " id " ] for n in search_data [ " edges " ] ]
ids_found = [ n [ " listItem " ] [ " id " ] if is_list else n [ " node " ] [ " title " ] [ " id " ] for n in search_data [ " edges " ] ]
if i == num_of_pages :
ids_found = ids_found [ : remainder ]
imdb_ids . extend ( ids_found )
@ -511,19 +517,16 @@ class IMDb:
if method == " imdb_id " :
logger . info ( f " Processing IMDb ID: { data } " )
return [ ( data , " imdb " ) ]
elif method == " imdb_list " :
logger . info ( f " Processing IMDb List: { data [ ' list _id' ] } " )
elif method in [ " imdb_list " , " imdb_watchlist " ] :
logger . info ( f " Processing IMDb { ' List' if method == ' imdb_list ' else ' Watchlist ' } : { data [ ' list _id' if method == ' imdb_list ' else ' user _id' ] } " )
if data [ " limit " ] > 0 :
logger . info ( f " Limit: { data [ ' limit ' ] } " )
if " sort_by " in data :
logger . info ( f " Sort By: { data [ ' sort_by ' ] } " )
return [ ( i , " imdb " ) for i in self . _pagination ( data , search = False ) ]
return [ ( i , " imdb " ) for i in self . _pagination ( data , " list " if method == " imdb_list " else " watchlist " ) ]
elif method == " imdb_chart " :
logger . info ( f " Processing IMDb Chart: { charts [ data ] } " )
return [ ( _i , " imdb " ) for _i in self . _ids_from_chart ( data , language ) ]
elif method == " imdb_watchlist " :
logger . info ( f " Processing IMDb Watchlist: { data } " )
return [ ( _i , " imdb " ) for _i in self . _watchlist ( data , language ) ]
elif method == " imdb_award " :
if data [ " event_year " ] not in [ " all " , " latest " ] and len ( data [ " event_year " ] ) == 1 :
event_slug = f " { data [ ' event_year ' ] [ 0 ] } /1 " if " - " not in data [ " event_year " ] [ 0 ] else data [ " event_year " ] [ 0 ] . replace ( " - " , " / " )
@ -538,7 +541,7 @@ class IMDb:
logger . info ( f " Processing IMDb Search: " )
for k , v in data . items ( ) :
logger . info ( f " { k } : { v } " )
return [ ( _i , " imdb " ) for _i in self . _pagination ( data )]
return [ ( _i , " imdb " ) for _i in self . _pagination ( data , " search " )]
else :
raise Failed ( f " IMDb Error: Method { method } not supported " )