import csv , gzip , json , math , os , re , shutil , time
from modules import util
from modules . request import parse_qs , urlparse
from modules . util import Failed
logger = util . logger
builders = [ " imdb_list " , " imdb_id " , " imdb_chart " , " imdb_watchlist " , " imdb_search " , " imdb_award " ]
movie_charts = [
" box_office " , " popular_movies " , " top_movies " , " top_english " , " lowest_rated " ,
" top_indian " , " top_tamil " , " top_telugu " , " top_malayalam " , " trending_india " , " trending_tamil " , " trending_telugu "
]
show_charts = [ " popular_shows " , " top_shows " , " trending_india " ]
charts = {
" box_office " : " Box Office " ,
" popular_movies " : " Most Popular Movies " ,
" popular_shows " : " Most Popular TV Shows " ,
" top_movies " : " Top 250 Movies " ,
" top_shows " : " Top 250 TV Shows " ,
" top_english " : " Top Rated English Movies " ,
" lowest_rated " : " Lowest Rated Movies " ,
" top_tamil " : " Top Rated Tamil Movies " ,
" top_telugu " : " Top Rated Telugu Movies " ,
" top_malayalam " : " Top Rated Malayalam Movies " ,
" trending_india " : " Trending Indian Movies & Shows " ,
" trending_tamil " : " Trending Tamil Movies " ,
" trending_telugu " : " Trending Telugu Movies " ,
" top_indian " : " Top Rated Indian Movies " ,
}
chart_urls = {
" box_office " : " chart/boxoffice " ,
" popular_movies " : " chart/moviemeter " ,
" popular_shows " : " chart/tvmeter " ,
" top_movies " : " chart/top " ,
" top_shows " : " chart/toptv " ,
" top_english " : " chart/top-english-movies " ,
" lowest_rated " : " chart/bottom " ,
" top_indian " : " india/top-rated-indian-movies " ,
" top_tamil " : " india/top-rated-tamil-movies " ,
" top_telugu " : " india/top-rated-telugu-movies " ,
" top_malayalam " : " india/top-rated-malayalam-movies " ,
" trending_india " : " india/upcoming " ,
" trending_tamil " : " india/tamil " ,
" trending_telugu " : " india/telugu " ,
}
imdb_search_attributes = [
" limit " , " sort_by " , " title " , " type " , " type.not " , " release.after " , " release.before " , " rating.gte " , " rating.lte " ,
" votes.gte " , " votes.lte " , " genre " , " genre.any " , " genre.not " , " topic " , " topic.any " , " topic.not " ,
" alternate_version " , " alternate_version.not " , " crazy_credit " , " crazy_credit.not " , " location " , " location.not " ,
" goof " , " goof.not " , " plot " , " plot.not " , " quote " , " quote.not " , " soundtrack " , " soundtrack.not " ,
" trivia " , " trivia.not " , " event " , " event.winning " , " imdb_top " , " imdb_bottom " , " company " , " content_rating " ,
" country " , " country.any " , " country.not " , " country.origin " , " keyword " , " keyword.any " ,
" keyword.not " , " series " , " series.not " , " list " , " list.any " , " list.not " , " language " , " language.any " , " language.not " ,
" language.primary " , " popularity.gte " , " popularity.lte " , " cast " , " cast.any " , " cast.not " , " runtime.gte " ,
" runtime.lte " , " adult " ,
]
sort_by_options = {
" popularity " : " POPULARITY " ,
" title " : " TITLE_REGIONAL " ,
" rating " : " USER_RATING " ,
" votes " : " USER_RATING_COUNT " ,
" box_office " : " BOX_OFFICE_GROSS_DOMESTIC " ,
" runtime " : " RUNTIME " ,
" year " : " YEAR " ,
" release " : " RELEASE_DATE " ,
}
sort_options = [ f " { a } . { d } " for a in sort_by_options for d in [ " asc " , " desc " ] ]
list_sort_by_options = {
" custom " : " LIST_ORDER " ,
" popularity " : " POPULARITY " ,
" title " : " TITLE_REGIONAL " ,
" rating " : " USER_RATING " ,
" votes " : " USER_RATING_COUNT " ,
" runtime " : " RUNTIME " ,
" added " : " DATE_ADDED " ,
" release " : " RELEASE_DATE " ,
}
list_sort_options = [ f " { a } . { d } " for a in sort_by_options for d in [ " asc " , " desc " ] ]
title_type_options = {
" movie " : " movie " , " tv_series " : " tvSeries " , " short " : " short " , " tv_episode " : " tvEpisode " , " tv_mini_series " : " tvMiniSeries " ,
" tv_movie " : " tvMovie " , " tv_special " : " tvSpecial " , " tv_short " : " tvShort " , " video_game " : " videoGame " , " video " : " video " ,
" music_video " : " musicVideo " , " podcast_series " : " podcastSeries " , " podcast_episode " : " podcastEpisode "
}
genre_options = { a . lower ( ) : a for a in [
" Action " , " Adventure " , " Animation " , " Biography " , " Comedy " , " Documentary " , " Drama " , " Crime " , " Family " , " History " ,
" News " , " Short " , " Western " , " Sport " , " Reality-TV " , " Horror " , " Fantasy " , " Film-Noir " , " Music " , " Romance " ,
" Talk-Show " , " Thriller " , " War " , " Sci-Fi " , " Musical " , " Mystery " , " Game-Show "
] }
topic_options = {
" alternate_version " : " ALTERNATE_VERSION " ,
" award " : " AWARD " ,
" business_info " : " BUSINESS_INFO " ,
" crazy_credit " : " CRAZY_CREDIT " ,
" goof " : " GOOF " ,
" location " : " LOCATION " ,
" plot " : " PLOT " ,
" quote " : " QUOTE " ,
" soundtrack " : " SOUNDTRACK " ,
" technical " : " TECHNICAL " ,
" trivia " : " TRIVIA " ,
}
company_options = {
" fox " : [ " co0000756 " , " co0176225 " , " co0201557 " , " co0017497 " ] ,
" dreamworks " : [ " co0067641 " , " co0040938 " , " co0252576 " , " co0003158 " ] ,
" mgm " : [ " co0007143 " , " co0026841 " ] ,
" paramount " : [ " co0023400 " ] ,
" sony " : [ " co0050868 " , " co0026545 " , " co0121181 " ] ,
" universal " : [ " co0005073 " , " co0055277 " , " co0042399 " ] ,
" disney " : [ " co0008970 " , " co0017902 " , " co0098836 " , " co0059516 " , " co0092035 " , " co0049348 " ] ,
" warner " : [ " co0002663 " , " co0005035 " , " co0863266 " , " co0072876 " , " co0080422 " , " co0046718 " ] ,
}
event_options = {
" cannes " : { " eventId " : " ev0000147 " } ,
" choice " : { " eventId " : " ev0000133 " } ,
" spirit " : { " eventId " : " ev0000349 " } ,
" sundance " : { " eventId " : " ev0000631 " } ,
" bafta " : { " eventId " : " ev0000123 " } ,
" oscar " : { " eventId " : " ev0000003 " } ,
" emmy " : { " eventId " : " ev0000223 " } ,
" golden " : { " eventId " : " ev0000292 " } ,
" oscar_picture " : { " eventId " : " ev0000003 " , " searchAwardCategoryId " : " bestPicture " } ,
" oscar_director " : { " eventId " : " ev0000003 " , " searchAwardCategoryId " : " bestDirector " } ,
" national_film_board_preserved " : { " eventId " : " ev0000468 " } ,
" razzie " : { " eventId " : " ev0000558 " } ,
}
base_url = " https://www.imdb.com "
git_base = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Awards/master "
search_hash_url = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Hash/master/HASH "
list_hash_url = " https://raw.githubusercontent.com/Kometa-Team/IMDb-Hash/master/LIST_HASH "
graphql_url = " https://api.graphql.imdb.com/ "
list_url = f " { base_url } /list/ls "
class IMDb :
def __init__ ( self , requests , cache , default_dir ) :
self . requests = requests
self . cache = cache
self . default_dir = default_dir
self . _ratings = None
self . _genres = None
self . _episode_ratings = None
self . _events_validation = None
self . _events = { }
self . _search_hash = None
self . _list_hash = None
self . event_url_validation = { }
def _request ( self , url , language = None , xpath = None , params = None ) :
logger . trace ( f " URL: { url } " )
if params :
logger . trace ( f " Params: { params } " )
response = self . requests . get_html ( url , params = params , header = True , language = language )
return response . xpath ( xpath ) if xpath else response
def _graph_request ( self , json_data ) :
return self . requests . post_json ( graphql_url , headers = { " content-type " : " application/json " } , json = json_data )
@property
def search_hash ( self ) :
if self . _search_hash is None :
self . _search_hash = self . requests . get ( search_hash_url ) . text . strip ( )
return self . _search_hash
@property
def list_hash ( self ) :
if self . _list_hash is None :
self . _list_hash = self . requests . get ( list_hash_url ) . text . strip ( )
return self . _list_hash
@property
def events_validation ( self ) :
if self . _events_validation is None :
self . _events_validation = self . requests . get_yaml ( f " { git_base } /event_validation.yml " ) . data
return self . _events_validation
def get_event ( self , event_id ) :
if event_id not in self . _events :
self . _events [ event_id ] = self . requests . get_yaml ( f " { git_base } /events/ { event_id } .yml " ) . data
return self . _events [ event_id ]
def validate_imdb_lists ( self , err_type , imdb_lists ) :
valid_lists = [ ]
for imdb_dict in util . get_list ( imdb_lists , split = False ) :
if not isinstance ( imdb_dict , dict ) :
imdb_dict = { " list_id " : imdb_dict }
if " url " in imdb_dict and " list_id " not in imdb_dict :
imdb_dict [ " list_id " ] = imdb_dict [ " url " ]
dict_methods = { dm . lower ( ) : dm for dm in imdb_dict }
if " list_id " not in dict_methods :
raise Failed ( f " { err_type } Error: imdb_list list_id attribute not found " )
elif imdb_dict [ dict_methods [ " list_id " ] ] is None :
raise Failed ( f " { err_type } Error: imdb_list list_id attribute is blank " )
else :
imdb_url = imdb_dict [ dict_methods [ " list_id " ] ] . strip ( )
if imdb_url . startswith ( f " { base_url } /search/ " ) :
raise Failed ( " IMDb Error: URLs with https://www.imdb.com/search/ no longer works with imdb_list use imdb_search. " )
if imdb_url . startswith ( f " { base_url } /filmosearch/ " ) :
raise Failed ( " IMDb Error: URLs with https://www.imdb.com/filmosearch/ no longer works with imdb_list use imdb_search. " )
search = re . search ( r " (ls \ d+) " , imdb_url )
if not search :
raise Failed ( " IMDb Error: imdb_list list_id must begin with ls (ex. ls005526372) " )
new_dict = { " list_id " : search . group ( 1 ) }
if " limit " in dict_methods :
if imdb_dict [ dict_methods [ " limit " ] ] is None :
logger . warning ( f " { err_type } Warning: imdb_list limit attribute is blank using 0 as default " )
else :
try :
value = int ( str ( imdb_dict [ dict_methods [ " limit " ] ] ) )
if 0 < = value :
new_dict [ " limit " ] = value
except ValueError :
pass
if " limit " not in new_dict :
logger . warning ( f " { err_type } Warning: imdb_list limit attribute: { imdb_dict [ dict_methods [ ' limit ' ] ] } must be an integer 0 or greater using 0 as default " )
if " limit " not in new_dict :
new_dict [ " limit " ] = 0
if " sort_by " in dict_methods :
new_dict [ " sort_by " ] = util . parse ( err_type , dict_methods , imdb_dict , parent = " imdb_list " , default = " custom.asc " , options = list_sort_options )
valid_lists . append ( new_dict )
return valid_lists
def validate_imdb_watchlists ( self , err_type , users , language ) :
valid_users = [ ]
for user in util . get_list ( users ) :
user_id = None
if user . startswith ( " ur " ) :
try :
user_id = int ( user [ 2 : ] )
except ValueError :
pass
if not user_id :
raise Failed ( f " { err_type } Error: User { user } not in the format of ' ur######## ' " )
if self . _watchlist ( user , language ) :
valid_users . append ( user )
return valid_users
def get_event_years ( self , event_id ) :
if event_id in self . events_validation :
return True , self . events_validation [ event_id ] [ " years " ]
if event_id not in self . event_url_validation :
self . event_url_validation [ event_id ] = [ ]
for event_link in self . _request ( f " { base_url } /event/ { event_id } " , xpath = " //div[@class= ' event-history-widget ' ]//a/@href " ) :
parts = event_link . split ( " / " )
self . event_url_validation [ event_id ] . append ( f " { parts [ 3 ] } { f ' - { parts [ 4 ] } ' if parts [ 4 ] != ' 1 ' else ' ' } " )
return False , self . event_url_validation [ event_id ]
def get_award_names ( self , event_id , event_year ) :
if event_id in self . events_validation :
return self . events_validation [ event_id ] [ " awards " ] , self . events_validation [ event_id ] [ " categories " ]
award_names = [ ]
category_names = [ ]
event_slug = f " { event_year [ 0 ] } /1 " if " - " not in event_year [ 0 ] else event_year [ 0 ] . replace ( " - " , " / " )
for text in self . _request ( f " { base_url } /event/ { event_id } / { event_slug } /?ref_=ev_eh " , xpath = " //div[@class= ' article ' ]/script/text() " ) [ 0 ] . split ( " \n " ) :
if text . strip ( ) . startswith ( " IMDbReactWidgets.NomineesWidget.push " ) :
jsonline = text . strip ( )
obj = json . loads ( jsonline [ jsonline . find ( " { " ) : - 3 ] )
for award in obj [ " nomineesWidgetModel " ] [ " eventEditionSummary " ] [ " awards " ] :
award_names . append ( award [ " awardName " ] )
for category in award [ " categories " ] :
category_names . append ( category [ " categoryName " ] )
break
return award_names , category_names
def _watchlist ( self , user , language ) :
imdb_url = f " { base_url } /user/ { user } /watchlist "
for text in self . _request ( imdb_url , language = language , xpath = " //div[@class= ' article ' ]/script/text() " ) [ 0 ] . split ( " \n " ) :
if text . strip ( ) . startswith ( " IMDbReactInitialState.push " ) :
jsonline = text . strip ( )
return [ f for f in json . loads ( jsonline [ jsonline . find ( ' { ' ) : - 2 ] ) [ " starbars " ] ]
raise Failed ( f " IMDb Error: Failed to parse URL: { imdb_url } " )
def _graphql_json ( self , data , search = True ) :
page_limit = 250 if search else 100
out = {
" locale " : " en-US " ,
" first " : data [ " limit " ] if " limit " in data and 0 < data [ " limit " ] < page_limit else page_limit ,
}
def check_constraint ( bases , mods , constraint , lower = " " , translation = None , range_name = None ) :
if not isinstance ( bases , list ) :
bases = [ bases ]
if range_name and not isinstance ( range_name , list ) :
range_name = [ range_name ]
for i , attr in enumerate ( bases ) :
attrs = [ ( f " { attr } . { m } " if m else attr , m , im ) for m , im in mods ]
if any ( [ m in data for m , _ , _ in attrs ] ) :
if constraint not in out :
out [ constraint ] = { }
range_data = { }
for full_attr , mod , imdb_mod in attrs :
if full_attr in data :
if range_name is not None :
range_data [ imdb_mod ] = data [ full_attr ]
elif translation is None :
out [ constraint ] [ f " { imdb_mod } { lower } " ] = data [ full_attr ]
elif isinstance ( translation , tuple ) :
out [ constraint ] [ f " { imdb_mod } { lower } " ] = [ d . replace ( translation [ 0 ] , translation [ 1 ] ) for d in data [ full_attr ] ]
elif isinstance ( translation , dict ) :
out [ constraint ] [ f " { imdb_mod } { lower } " ] = [ translation [ d ] for d in data [ full_attr ] ]
if range_data :
out [ constraint ] [ range_name [ i ] ] = range_data
sort = data [ " sort_by " ] if " sort_by " in data else " popularity.asc " if search else " custom.asc "
sort_by , sort_order = sort . split ( " . " )
if search :
out [ " titleTypeConstraint " ] = { " anyTitleTypeIds " : [ title_type_options [ t ] for t in data [ " type " ] ] if " type " in data else [ ] }
out [ " sortBy " ] = sort_by_options [ sort_by ]
out [ " sortOrder " ] = sort_order . upper ( )
check_constraint ( " type " , [ ( " not " , " excludeTitleTypeIds " ) ] , " titleTypeConstraint " , translation = title_type_options )
check_constraint ( " release " , [ ( " after " , " start " ) , ( " before " , " end " ) ] , " releaseDateConstraint " , range_name = " releaseDateRange " )
check_constraint ( " title " , [ ( " " , " searchTerm " ) ] , " titleTextConstraint " )
check_constraint ( [ " rating " , " votes " ] , [ ( " gte " , " min " ) , ( " lte " , " max " ) ] , " userRatingsConstraint " , range_name = [ " aggregateRatingRange " , " ratingsCountRange " ] )
check_constraint ( " genre " , [ ( " " , " all " ) , ( " any " , " any " ) , ( " not " , " exclude " ) ] , " genreConstraint " , lower = " GenreIds " , translation = genre_options )
check_constraint ( " topic " , [ ( " " , " all " ) , ( " any " , " any " ) , ( " not " , " no " ) ] , " withTitleDataConstraint " , lower = " DataAvailable " , translation = topic_options )
check_constraint ( " alternate_version " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " alternateVersionMatchingConstraint " , lower = " AlternateVersionTextTerms " )
check_constraint ( " crazy_credit " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " crazyCreditMatchingConstraint " , lower = " CrazyCreditTextTerms " )
check_constraint ( " location " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " filmingLocationConstraint " , lower = " Locations " )
check_constraint ( " goof " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " goofMatchingConstraint " , lower = " GoofTextTerms " )
check_constraint ( " plot " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " plotMatchingConstraint " , lower = " PlotTextTerms " )
check_constraint ( " quote " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " quoteMatchingConstraint " , lower = " QuoteTextTerms " )
check_constraint ( " soundtrack " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " soundtrackMatchingConstraint " , lower = " SoundtrackTextTerms " )
check_constraint ( " trivia " , [ ( " " , " all " ) , ( " any " , " any " ) ] , " triviaMatchingConstraint " , lower = " TriviaTextTerms " )
if " event " in data or " event.winning " in data :
input_list = [ ]
if " event " in data :
input_list . extend ( [ event_options [ a ] if a in event_options else { " eventId " : a } for a in data [ " event " ] ] )
if " event.winning " in data :
for a in data [ " event.winning " ] :
award_dict = event_options [ a ] if a in event_options else { " eventId " : a }
award_dict [ " winnerFilter " ] = " WINNER_ONLY "
input_list . append ( award_dict )
out [ " awardConstraint " ] = { " allEventNominations " : input_list }
if any ( [ a in data for a in [ " imdb_top " , " imdb_bottom " , " popularity.gte " , " popularity.lte " ] ] ) :
ranges = [ ]
if " imdb_top " in data :
ranges . append ( { " rankRange " : { " max " : data [ " imdb_top " ] } , " rankedTitleListType " : " TOP_RATED_MOVIES " } )
if " imdb_bottom " in data :
ranges . append ( { " rankRange " : { " max " : data [ " imdb_bottom " ] } , " rankedTitleListType " : " LOWEST_RATED_MOVIES " } )
if " popularity.gte " in data or " popularity.lte " in data :
num_range = { }
if " popularity.lte " in data :
num_range [ " max " ] = data [ " popularity.lte " ]
if " popularity.gte " in data :
num_range [ " min " ] = data [ " popularity.gte " ]
ranges . append ( { " rankRange " : num_range , " rankedTitleListType " : " TITLE_METER " } )
out [ " rankedTitleListConstraint " ] = { " allRankedTitleLists " : ranges }
check_constraint ( " series " , [ ( " " , " any " ) , ( " not " , " exclude " ) ] , " episodicConstraint " , lower = " SeriesIds " )
check_constraint ( " list " , [ ( " " , " inAllLists " ) , ( " any " , " inAnyList " ) , ( " not " , " notInAnyList " ) ] , " listConstraint " )
if " company " in data :
company_ids = [ ]
for c in data [ " company " ] :
if c in company_options :
company_ids . extend ( company_options [ c ] )
else :
company_ids . append ( c )
out [ " creditedCompanyConstraint " ] = { " anyCompanyIds " : company_ids }
check_constraint ( " content_rating " , [ ( " " , " anyRegionCertificateRatings " ) ] , " certificateConstraint " )
check_constraint ( " country " , [ ( " " , " all " ) , ( " any " , " any " ) , ( " not " , " exclude " ) , ( " origin " , " anyPrimary " ) ] , " originCountryConstraint " , lower = " Countries " )
check_constraint ( " keyword " , [ ( " " , " all " ) , ( " any " , " any " ) , ( " not " , " exclude " ) ] , " keywordConstraint " , lower = " Keywords " , translation = ( " " , " - " ) )
check_constraint ( " language " , [ ( " " , " all " ) , ( " any " , " any " ) , ( " not " , " exclude " ) , ( " primary " , " anyPrimary " ) ] , " languageConstraint " , lower = " Languages " )
check_constraint ( " cast " , [ ( " " , " all " ) , ( " any " , " any " ) , ( " not " , " exclude " ) ] , " creditedNameConstraint " , lower = " NameIds " )
check_constraint ( " runtime " , [ ( " gte " , " min " ) , ( " lte " , " max " ) ] , " runtimeConstraint " , range_name = " runtimeRangeMinutes " )
if " adult " in data and data [ " adult " ] :
out [ " explicitContentConstraint " ] = { " explicitContentFilter " : " INCLUDE_ADULT " }
else :
out [ " lsConst " ] = data [ " list_id " ]
out [ " sort " ] = { " by " : list_sort_by_options [ sort_by ] , " order " : sort_order . upper ( ) }
logger . trace ( out )
return {
" operationName " : " AdvancedTitleSearch " if search else " TitleListMainPage " ,
" variables " : out ,
" extensions " : { " persistedQuery " : { " version " : 1 , " sha256Hash " : self . search_hash if search else self . list_hash } }
}
def _pagination ( self , data , search = True ) :
json_obj = self . _graphql_json ( data , search = search )
item_count = 250 if search else 100
imdb_ids = [ ]
logger . ghost ( " Parsing Page 1 " )
response_json = self . _graph_request ( json_obj )
try :
search_data = response_json [ " data " ] [ " advancedTitleSearch " ] if search else response_json [ " data " ] [ " list " ] [ " titleListItemSearch " ]
total = search_data [ " total " ]
limit = data [ " limit " ]
if limit < 1 or total < limit :
limit = total
remainder = limit % item_count
if remainder == 0 :
remainder = item_count
num_of_pages = math . ceil ( int ( limit ) / item_count )
end_cursor = search_data [ " pageInfo " ] [ " endCursor " ]
imdb_ids . extend ( [ n [ " node " ] [ " title " ] [ " id " ] if search else n [ " listItem " ] [ " id " ] for n in search_data [ " edges " ] ] )
if num_of_pages > 1 :
for i in range ( 2 , num_of_pages + 1 ) :
start_num = ( i - 1 ) * item_count + 1
logger . ghost ( f " Parsing Page { i } / { num_of_pages } { start_num } - { limit if i == num_of_pages else i * item_count } " )
json_obj [ " variables " ] [ " after " ] = end_cursor
response_json = self . _graph_request ( json_obj )
search_data = response_json [ " data " ] [ " advancedTitleSearch " ] if search else response_json [ " data " ] [ " list " ] [ " titleListItemSearch " ]
end_cursor = search_data [ " pageInfo " ] [ " endCursor " ]
ids_found = [ n [ " node " ] [ " title " ] [ " id " ] if search else n [ " listItem " ] [ " id " ] for n in search_data [ " edges " ] ]
if i == num_of_pages :
ids_found = ids_found [ : remainder ]
imdb_ids . extend ( ids_found )
logger . exorcise ( )
if len ( imdb_ids ) > 0 :
return imdb_ids
raise Failed ( " IMDb Error: No IMDb IDs Found " )
except KeyError :
logger . error ( f " Response: { response_json } " )
raise
def _award ( self , data ) :
final_list = [ ]
if data [ " event_id " ] in self . events_validation :
event_data = self . get_event ( data [ " event_id " ] )
if data [ " event_year " ] == " all " :
event_years = self . events_validation [ data [ " event_id " ] ] [ " years " ]
elif data [ " event_year " ] == " latest " :
event_years = [ self . events_validation [ data [ " event_id " ] ] [ " years " ] [ 0 ] ]
else :
event_years = data [ " event_year " ]
for event_year in event_years :
for award , categories in event_data [ event_year ] . items ( ) :
if data [ " award_filter " ] and award not in data [ " award_filter " ] :
continue
for cat in categories :
if data [ " category_filter " ] and cat not in data [ " category_filter " ] :
continue
final_list . extend ( categories [ cat ] [ " winner " if data [ " winning " ] else " nominee " ] )
else :
event_year = self . get_event_years ( data [ " event_id " ] ) [ 0 ] if data [ " event_year " ] == " latest " else data [ " event_year " ] [ 0 ]
event_slug = f " { event_year } /1 " if " - " not in event_year else event_year . replace ( " - " , " / " )
for text in self . _request ( f " { base_url } /event/ { data [ ' event_id ' ] } / { event_slug } /?ref_=ev_eh " , xpath = " //div[@class= ' article ' ]/script/text() " ) [ 0 ] . split ( " \n " ) :
if text . strip ( ) . startswith ( " IMDbReactWidgets.NomineesWidget.push " ) :
jsonline = text . strip ( )
obj = json . loads ( jsonline [ jsonline . find ( ' { ' ) : - 3 ] )
for award in obj [ " nomineesWidgetModel " ] [ " eventEditionSummary " ] [ " awards " ] :
if data [ " award_filter " ] and award [ " awardName " ] not in data [ " award_filter " ] :
continue
for cat in award [ " categories " ] :
if data [ " category_filter " ] and cat [ " categoryName " ] not in data [ " category_filter " ] :
continue
for nom in cat [ " nominations " ] :
if data [ " winning " ] and not nom [ " isWinner " ] :
continue
imdb_id = next ( ( n [ " const " ] for n in nom [ " primaryNominees " ] + nom [ " secondaryNominees " ] if n [ " const " ] . startswith ( " tt " ) ) , None )
if imdb_id :
final_list . append ( imdb_id )
break
return final_list
def keywords ( self , imdb_id , language , ignore_cache = False ) :
imdb_keywords = { }
expired = None
if self . cache and not ignore_cache :
imdb_keywords , expired = self . cache . query_imdb_keywords ( imdb_id , self . cache . expiration )
if imdb_keywords and expired is False :
return imdb_keywords
keywords = self . _request ( f " { base_url } /title/ { imdb_id } /keywords " , language = language , xpath = " //td[@class= ' soda sodavote ' ] " )
if not keywords :
raise Failed ( f " IMDb Error: No Item Found for IMDb ID: { imdb_id } " )
for k in keywords :
name = k . xpath ( " div[@class= ' sodatext ' ]/a/text() " ) [ 0 ]
relevant = k . xpath ( " div[@class= ' did-you-know-actions ' ]/div/a/text() " ) [ 0 ] . strip ( )
if " of " in relevant :
result = re . search ( r " ( \ d+) of ( \ d+).* " , relevant )
imdb_keywords [ name ] = ( int ( result . group ( 1 ) ) , int ( result . group ( 2 ) ) )
else :
imdb_keywords [ name ] = ( 0 , 0 )
if self . cache and not ignore_cache :
self . cache . update_imdb_keywords ( expired , imdb_id , imdb_keywords , self . cache . expiration )
return imdb_keywords
def parental_guide ( self , imdb_id , ignore_cache = False ) :
parental_dict = { }
expired = None
if self . cache and not ignore_cache :
parental_dict , expired = self . cache . query_imdb_parental ( imdb_id , self . cache . expiration )
if parental_dict and expired is False :
return parental_dict
response = self . _request ( f " { base_url } /title/ { imdb_id } /parentalguide " )
for ptype in util . parental_types :
results = response . xpath ( f " //section[@id= ' advisory- { ptype } ' ]//span[contains(@class, ' ipl-status-pill ' )]/text() " )
if results :
parental_dict [ ptype ] = results [ 0 ] . strip ( )
else :
raise Failed ( f " IMDb Error: No Item Found for IMDb ID: { imdb_id } " )
if self . cache and not ignore_cache :
self . cache . update_imdb_parental ( expired , imdb_id , parental_dict , self . cache . expiration )
return parental_dict
def _ids_from_chart ( self , chart , language ) :
if chart not in chart_urls :
raise Failed ( f " IMDb Error: chart: { chart } not " )
script_data = self . _request ( f " { base_url } / { chart_urls [ chart ] } " , language = language , xpath = " //script[@id= ' __NEXT_DATA__ ' ]/text() " ) [ 0 ]
return [ x . group ( 1 ) for x in re . finditer ( r ' " (tt \ d+) " ' , script_data ) ]
def get_imdb_ids ( self , method , data , language ) :
if method == " imdb_id " :
logger . info ( f " Processing IMDb ID: { data } " )
return [ ( data , " imdb " ) ]
elif method == " imdb_list " :
logger . info ( f " Processing IMDb List: { data [ ' list_id ' ] } " )
if data [ " limit " ] > 0 :
logger . info ( f " Limit: { data [ ' limit ' ] } " )
if " sort_by " in data :
logger . info ( f " Sort By: { data [ ' sort_by ' ] } " )
return [ ( i , " imdb " ) for i in self . _pagination ( data , search = False ) ]
elif method == " imdb_chart " :
logger . info ( f " Processing IMDb Chart: { charts [ data ] } " )
return [ ( _i , " imdb " ) for _i in self . _ids_from_chart ( data , language ) ]
elif method == " imdb_watchlist " :
logger . info ( f " Processing IMDb Watchlist: { data } " )
return [ ( _i , " imdb " ) for _i in self . _watchlist ( data , language ) ]
elif method == " imdb_award " :
if data [ " event_year " ] not in [ " all " , " latest " ] and len ( data [ " event_year " ] ) == 1 :
event_slug = f " { data [ ' event_year ' ] [ 0 ] } /1 " if " - " not in data [ " event_year " ] [ 0 ] else data [ " event_year " ] [ 0 ] . replace ( " - " , " / " )
logger . info ( f " Processing IMDb Award: { base_url } /event/ { data [ ' event_id ' ] } / { event_slug } /?ref_=ev_eh " )
else :
logger . info ( f " Processing IMDb Award: { data [ ' event_id ' ] } " )
logger . info ( f " event_year: { data [ ' event_year ' ] } " )
for k in [ " award_filter " , " category_filter " , " winning " ] :
logger . info ( f " { k } : { data [ k ] } " )
return [ ( _i , " imdb " ) for _i in self . _award ( data ) ]
elif method == " imdb_search " :
logger . info ( f " Processing IMDb Search: " )
for k , v in data . items ( ) :
logger . info ( f " { k } : { v } " )
return [ ( _i , " imdb " ) for _i in self . _pagination ( data ) ]
else :
raise Failed ( f " IMDb Error: Method { method } not supported " )
def _interface ( self , interface ) :
gz = os . path . join ( self . default_dir , f " title. { interface } .tsv.gz " )
tsv = os . path . join ( self . default_dir , f " title. { interface } .tsv " )
if os . path . exists ( gz ) :
os . remove ( gz )
if os . path . exists ( tsv ) :
os . remove ( tsv )
self . requests . get_stream ( f " https://datasets.imdbws.com/title. { interface } .tsv.gz " , gz , " IMDb Interface " )
with open ( tsv , " wb " ) as f_out :
with gzip . open ( gz , " rb " ) as f_in :
shutil . copyfileobj ( f_in , f_out )
with open ( tsv , " r " , encoding = " utf-8 " ) as t :
if interface == " ratings " :
data = { line [ 0 ] : line [ 1 ] for line in csv . reader ( t , delimiter = " \t " ) }
elif interface == " basics " :
data = { line [ 0 ] : str ( line [ - 1 ] ) . split ( " , " ) for line in csv . reader ( t , delimiter = " \t " ) if str ( line [ - 1 ] ) != " \\ N " }
else :
data = [ line for line in csv . reader ( t , delimiter = " \t " ) ]
if os . path . exists ( gz ) :
os . remove ( gz )
if os . path . exists ( tsv ) :
os . remove ( tsv )
return data
@property
def ratings ( self ) :
if self . _ratings is None :
self . _ratings = self . _interface ( " ratings " )
return self . _ratings
@property
def genres ( self ) :
if self . _genres is None :
self . _genres = self . _interface ( " basics " )
return self . _genres
@property
def episode_ratings ( self ) :
if self . _episode_ratings is None :
self . _episode_ratings = { }
for imdb_id , parent_id , season_num , episode_num in self . _interface ( " episode " ) :
if imdb_id not in self . ratings :
continue
if parent_id not in self . _episode_ratings :
self . _episode_ratings [ parent_id ] = { }
if season_num not in self . _episode_ratings [ parent_id ] :
self . _episode_ratings [ parent_id ] [ season_num ] = { }
self . _episode_ratings [ parent_id ] [ season_num ] [ episode_num ] = self . ratings [ imdb_id ]
return self . _episode_ratings
def get_rating ( self , imdb_id ) :
return self . ratings [ imdb_id ] if imdb_id in self . ratings else None
def get_genres ( self , imdb_id ) :
return self . genres [ imdb_id ] if imdb_id in self . genres else [ ]
def get_episode_rating ( self , imdb_id , season_num , episode_num ) :
season_num = str ( season_num )
episode_num = str ( episode_num )
if imdb_id not in self . episode_ratings or season_num not in self . episode_ratings [ imdb_id ] or episode_num not in self . episode_ratings [ imdb_id ] [ season_num ] :
return None
return self . episode_ratings [ imdb_id ] [ season_num ] [ episode_num ]
def item_filter ( self , imdb_info , filter_attr , modifier , filter_final , filter_data ) :
if filter_attr == " imdb_keyword " :
mr = filter_data [ " minimum_relevant " ]
mv = filter_data [ " minimum_votes " ]
mp = filter_data [ " minimum_percentage " ]
attrs = [ k for k , ( r , v ) in imdb_info . items ( ) if r > = mr and v > = mv and ( v == 0 or r / v > = mp ) ]
if modifier == " .regex " :
has_match = False
for reg in filter_data :
for name in attrs :
if re . compile ( reg ) . search ( name ) :
has_match = True
if has_match is False :
return False
elif modifier in [ " .count_gt " , " .count_gte " , " .count_lt " , " .count_lte " ] :
test_number = len ( attrs ) if attrs else 0
modifier = f " . { modifier [ 7 : ] } "
if test_number is None or util . is_number_filter ( test_number , modifier , filter_data ) :
return False
elif ( not list ( set ( filter_data [ " keywords " ] ) & set ( attrs ) ) and modifier == " " ) \
or ( list ( set ( filter_data [ " keywords " ] ) & set ( attrs ) ) and modifier == " .not " ) :
return False
return True