You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/imdb.py

132 lines
7.3 KiB

import logging, math, re, requests, time
from lxml import html
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
class IMDbAPI:
def __init__(self, Cache=None, TMDb=None, Trakt=None, TVDb=None):
if TMDb is None and Trakt is None:
raise Failed("IMDb Error: IMDb requires either TMDb or Trakt")
self.Cache = Cache
self.TMDb = TMDb
self.Trakt = Trakt
self.TVDb = TVDb
def get_imdb_ids_from_url(self, imdb_url, language, limit):
imdb_url = imdb_url.strip()
if not imdb_url.startswith("https://www.imdb.com/list/ls") and not imdb_url.startswith("https://www.imdb.com/search/title/?"):
raise Failed("IMDb Error: {} must begin with either:\n| https://www.imdb.com/list/ls (For Lists)\n| https://www.imdb.com/search/title/? (For Searches)".format(imdb_url))
if imdb_url.startswith("https://www.imdb.com/list/ls"):
try: list_id = re.search("(\\d+)", str(imdb_url)).group(1)
except AttributeError: raise Failed("IMDb Error: Failed to parse List ID from {}".format(imdb_url))
current_url = "https://www.imdb.com/search/title/?lists=ls{}".format(list_id)
else:
current_url = imdb_url
header = {"Accept-Language": language}
length = 0
imdb_ids = []
response = self.send_request(current_url, header)
try: results = html.fromstring(response).xpath("//div[@class='desc']/span/text()")[0].replace(",", "")
except IndexError: raise Failed("IMDb Error: Failed to parse URL: {}".format(imdb_url))
try: total = int(re.findall("(\\d+) title", results)[0])
except IndexError: raise Failed("IMDb Error: No Results at URL: {}".format(imdb_url))
if "&start=" in current_url: current_url = re.sub("&start=\d+", "", current_url)
if "&count=" in current_url: current_url = re.sub("&count=\d+", "", current_url)
if limit < 1 or total < limit: limit = total
remainder = limit % 250
if remainder == 0: remainder = 250
num_of_pages = math.ceil(int(limit) / 250)
for i in range(1, num_of_pages + 1):
start_num = (i - 1) * 250 + 1
length = util.print_return(length, "Parsing Page {}/{} {}-{}".format(i, num_of_pages, start_num, limit if i == num_of_pages else i * 250))
response = self.send_request("{}&count={}&start={}".format(current_url, remainder if i == num_of_pages else 250, start_num), header)
imdb_ids.extend(html.fromstring(response).xpath("//div[contains(@class, 'lister-item-image')]//a/img//@data-tconst"))
util.print_end(length)
if imdb_ids: return imdb_ids
else: raise Failed("IMDb Error: No Movies Found at {}".format(imdb_url))
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url, header):
return requests.get(url, headers=header).content
def get_items(self, method, data, language, status_message=True):
pretty = util.pretty_names[method] if method in util.pretty_names else method
if status_message:
logger.debug("Data: {}".format(data))
show_ids = []
movie_ids = []
if method == "imdb_id":
if status_message:
logger.info("Processing {}: {}".format(pretty, data))
tmdb_id, tvdb_id = self.convert_from_imdb(data, language)
if tmdb_id: movie_ids.append(tmdb_id)
if tvdb_id: show_ids.append(tvdb_id)
elif method == "imdb_list":
if status_message:
logger.info("Processing {}: {}".format(pretty,"{} Items at {}".format(data["limit"], data["url"]) if data["limit"] > 0 else data["url"]))
imdb_ids = self.get_imdb_ids_from_url(data["url"], language, data["limit"])
total_ids = len(imdb_ids)
length = 0
for i, imdb_id in enumerate(imdb_ids, 1):
length = util.print_return(length, "Converting IMDb ID {}/{}".format(i, total_ids))
try:
tmdb_id, tvdb_id = self.convert_from_imdb(imdb_id, language)
if tmdb_id: movie_ids.append(tmdb_id)
if tvdb_id: show_ids.append(tvdb_id)
except Failed as e: logger.warning(e)
util.print_end(length, "Processed {} IMDb IDs".format(total_ids))
else:
raise Failed("IMDb Error: Method {} not supported".format(method))
if status_message:
logger.debug("TMDb IDs Found: {}".format(movie_ids))
logger.debug("TVDb IDs Found: {}".format(show_ids))
return movie_ids, show_ids
def convert_from_imdb(self, imdb_id, language):
if self.Cache:
tmdb_id, tvdb_id = self.Cache.get_ids_from_imdb(imdb_id)
update_tmdb = False
if not tmdb_id:
tmdb_id, update_tmdb = self.Cache.get_tmdb_from_imdb(imdb_id)
if update_tmdb:
tmdb_id = None
update_tvdb = False
if not tvdb_id:
tvdb_id, update_tvdb = self.Cache.get_tvdb_from_imdb(imdb_id)
if update_tvdb:
tvdb_id = None
else:
tmdb_id = None
tvdb_id = None
from_cache = tmdb_id is not None or tvdb_id is not None
if not tmdb_id and not tvdb_id and self.TMDb:
try: tmdb_id = self.TMDb.convert_imdb_to_tmdb(imdb_id)
except Failed: pass
if not tmdb_id and not tvdb_id and self.TMDb:
try: tvdb_id = self.TMDb.convert_imdb_to_tvdb(imdb_id)
except Failed: pass
if not tmdb_id and not tvdb_id and self.Trakt:
try: tmdb_id = self.Trakt.convert_imdb_to_tmdb(imdb_id)
except Failed: pass
if not tmdb_id and not tvdb_id and self.Trakt:
try: tvdb_id = self.Trakt.convert_imdb_to_tvdb(imdb_id)
except Failed: pass
try:
if tmdb_id and not from_cache: self.TMDb.get_movie(tmdb_id)
except Failed: tmdb_id = None
try:
if tvdb_id and not from_cache: self.TVDb.get_series(language, tvdb_id=tvdb_id)
except Failed: tvdb_id = None
if not tmdb_id and not tvdb_id : raise Failed("IMDb Error: No TMDb ID or TVDb ID found for IMDb: {}".format(imdb_id))
if self.Cache:
if tmdb_id and update_tmdb is not False:
self.Cache.update_imdb("movie", update_tmdb, imdb_id, tmdb_id)
if tvdb_id and update_tvdb is not False:
self.Cache.update_imdb("show", update_tvdb, imdb_id, tvdb_id)
return tmdb_id, tvdb_id