#306 Added a Session

pull/351/head
meisnate12 3 years ago
parent 97a01a6496
commit 2f15564e62

@ -1,52 +1,48 @@
import logging, requests, time
from lxml import html
import logging, time
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
builders = ["anidb_id", "anidb_relation", "anidb_popular", "anidb_tag"]
base_url = "https://anidb.net"
urls = {
"anime": f"{base_url}/anime",
"popular": f"{base_url}/latest/anime/popular/?h=1",
"relation": "/relation/graph",
"tag": f"{base_url}/tag",
"login": f"{base_url}/perl-bin/animedb.pl"
}
class AniDB:
def __init__(self, params, config):
def __init__(self, config, params):
self.config = config
self.urls = {
"anime": "https://anidb.net/anime",
"popular": "https://anidb.net/latest/anime/popular/?h=1",
"relation": "/relation/graph",
"anidb_tag": "https://anidb.net/tag",
"login": "https://anidb.net/perl-bin/animedb.pl"
}
self.username = params["username"] if params else None
self.password = params["password"] if params else None
if params:
if not self._login(params["username"], params["password"]).xpath("//li[@class='sub-menu my']/@title"):
if not self._login(self.username, self.password).xpath("//li[@class='sub-menu my']/@title"):
raise Failed("AniDB Error: Login failed")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url, language):
return html.fromstring(self.config.session.get(url, headers={"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}).content)
def _request(self, url, language=None, postData=None):
if postData:
return self.config.post_html(url, postData, headers=util.header(language))
else:
return self.config.get_html(url, headers=util.header(language))
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _login(self, username, password):
data = {
"show": "main",
"xuser": username,
"xpass": password,
"xdoautologin": "on"
}
return html.fromstring(self.config.session.post(self.urls["login"], data, headers={"Accept-Language": "en-US,en;q=0.5", "User-Agent": "Mozilla/5.0 x64"}).content)
data = {"show": "main", "xuser": username, "xpass": password, "xdoautologin": "on"}
return self._request(urls["login"], postData=data)
def _popular(self, language):
response = self._request(self.urls["popular"], language)
response = self._request(urls["popular"], language=language)
return util.get_int_list(response.xpath("//td[@class='name anime']/a/@href"), "AniDB ID")
def _relations(self, anidb_id, language):
response = self._request(f"{self.urls['anime']}/{anidb_id}{self.urls['relation']}", language)
response = self._request(f"{urls['anime']}/{anidb_id}{urls['relation']}", language=language)
return util.get_int_list(response.xpath("//area/@href"), "AniDB ID")
def _validate(self, anidb_id, language):
response = self._request(f"{self.urls['anime']}/{anidb_id}", language)
response = self._request(f"{urls['anime']}/{anidb_id}", language=language)
ids = response.xpath(f"//*[text()='a{anidb_id}']/text()")
if len(ids) > 0:
return util.regex_first_int(ids[0], "AniDB ID")
@ -65,16 +61,15 @@ class AniDB:
def _tag(self, tag, limit, language):
anidb_ids = []
current_url = f"{self.urls['anidb_tag']}/{tag}"
current_url = f"{urls['tag']}/{tag}"
while True:
response = self._request(current_url, language)
int_list = util.get_int_list(response.xpath("//td[@class='name main anime']/a/@href"), "AniDB ID")
anidb_ids.extend(int_list)
response = self._request(current_url, language=language)
anidb_ids.extend(util.get_int_list(response.xpath("//td[@class='name main anime']/a/@href"), "AniDB ID"))
next_page_list = response.xpath("//li[@class='next']/a/@href")
if len(anidb_ids) >= limit or len(next_page_list) == 0:
break
time.sleep(2)
current_url = f"https://anidb.net{next_page_list[0]}"
current_url = f"{base_url}{next_page_list[0]}"
return anidb_ids[:limit]
def get_items(self, method, data, language):

@ -1,4 +1,4 @@
import logging, requests, time
import logging, time
from modules import util
from modules.util import Failed
from retrying import retry
@ -19,13 +19,13 @@ pretty_names = {
"score": "Average Score",
"popular": "Popularity"
}
base_url = "https://graphql.anilist.co"
tag_query = "query{MediaTagCollection {name}}"
genre_query = "query{GenreCollection}"
class AniList:
def __init__(self, config):
self.config = config
self.url = "https://graphql.anilist.co"
self.tags = {}
self.genres = {}
self.tags = {t["name"].lower(): t["name"] for t in self._request(tag_query, {})["data"]["MediaTagCollection"]}
@ -33,7 +33,7 @@ class AniList:
@retry(stop_max_attempt_number=2, retry_on_exception=util.retry_if_not_failed)
def _request(self, query, variables):
response = requests.post(self.url, json={"query": query, "variables": variables})
response = self.config.post(base_url, json={"query": query, "variables": variables})
json_obj = response.json()
if "errors" in json_obj:
if json_obj['errors'][0]['message'] == "Too Many Requests.":

@ -2,7 +2,7 @@ import logging, os, re
from datetime import datetime, timedelta
from modules import anidb, anilist, icheckmovies, imdb, letterboxd, mal, plex, radarr, sonarr, tautulli, tmdb, trakttv, tvdb, util
from modules.util import Failed, ImageData
from PIL import Image, UnidentifiedImageError
from PIL import Image
from plexapi.exceptions import BadRequest, NotFound
from plexapi.video import Movie, Show
from urllib.parse import quote
@ -1238,7 +1238,7 @@ class CollectionBuilder:
indent = f"\n{' ' * level}"
conjunction = f"{'and' if is_all else 'or'}=1&"
for _key, _data in filter_dict.items():
attr, modifier, final = self._split(_key)
attr, modifier, final_attr = self._split(_key)
def build_url_arg(arg, mod=None, arg_s=None, mod_s=None):
arg_key = plex.search_translation[attr] if attr in plex.search_translation else attr
@ -1254,15 +1254,15 @@ class CollectionBuilder:
display_line = f"{indent}{param_s} {mod_s} {arg_s}"
return f"{arg_key}{mod}={arg}&", display_line
if final not in plex.searches and not final.startswith(("any", "all")):
raise Failed(f"Collection Error: {final} is not a valid {method} attribute")
elif final in plex.movie_only_searches and self.library.is_show:
raise Failed(f"Collection Error: {final} {method} attribute only works for movie libraries")
elif final in plex.show_only_searches and self.library.is_movie:
raise Failed(f"Collection Error: {final} {method} attribute only works for show libraries")
if final_attr not in plex.searches and not final_attr.startswith(("any", "all")):
raise Failed(f"Collection Error: {final_attr} is not a valid {method} attribute")
elif final_attr in plex.movie_only_searches and self.library.is_show:
raise Failed(f"Collection Error: {final_attr} {method} attribute only works for movie libraries")
elif final_attr in plex.show_only_searches and self.library.is_movie:
raise Failed(f"Collection Error: {final_attr} {method} attribute only works for show libraries")
elif _data is None:
raise Failed(f"Collection Error: {final} {method} attribute is blank")
elif final.startswith(("any", "all")):
raise Failed(f"Collection Error: {final_attr} {method} attribute is blank")
elif final_attr.startswith(("any", "all")):
dicts = util.get_list(_data)
results = ""
display_add = ""
@ -1274,7 +1274,7 @@ class CollectionBuilder:
display_add += inside_display
results += f"{conjunction if len(results) > 0 else ''}push=1&{inside_filter}pop=1&"
else:
validation = self.validate_attribute(attr, modifier, final, _data, validate, pairs=True)
validation = self.validate_attribute(attr, modifier, final_attr, _data, validate, pairs=True)
if validation is None:
continue
elif attr in plex.date_attributes and modifier in ["", ".not"]:
@ -1436,7 +1436,6 @@ class CollectionBuilder:
def add_to_collection(self):
name, collection_items = self.library.get_collection_name_and_items(self.obj if self.obj else self.name, self.smart_label_collection)
total = len(self.rating_keys)
max_length = len(str(total))
for i, item in enumerate(self.rating_keys, 1):
try:
current = self.fetch_item(item)
@ -1772,7 +1771,7 @@ class CollectionBuilder:
continue
og_image = os.path.join(overlay_folder, f"{rating_key}.png")
if os.path.exists(og_image):
self.library._upload_file_poster(item, og_image)
self.library.upload_file_poster(item, og_image)
os.remove(og_image)
self.config.Cache.update_image_map(item.ratingKey, self.library.original_mapping_name, "poster", "", "", "")

@ -1,5 +1,6 @@
import logging, os, requests
from datetime import datetime
from lxml import html
from modules import util
from modules.anidb import AniDB
from modules.anilist import AniList
@ -18,6 +19,7 @@ from modules.tmdb import TMDb
from modules.trakttv import Trakt
from modules.tvdb import TVDb
from modules.util import Failed
from retrying import retry
from ruamel import yaml
logger = logging.getLogger("Plex Meta Manager")
@ -230,7 +232,7 @@ class Config:
self.omdb = {}
try:
self.omdb["apikey"] = check_for_attribute(self.data, "apikey", parent="omdb", throw=True)
self.OMDb = OMDb(self.omdb, Cache=self.Cache)
self.OMDb = OMDb(self, self.omdb)
except Failed as e:
logger.error(e)
logger.info(f"OMDb Connection {'Failed' if self.OMDb is None else 'Successful'}")
@ -248,7 +250,7 @@ class Config:
self.trakt["client_secret"] = check_for_attribute(self.data, "client_secret", parent="trakt", throw=True)
self.trakt["config_path"] = self.config_path
authorization = self.data["trakt"]["authorization"] if "authorization" in self.data["trakt"] and self.data["trakt"]["authorization"] else None
self.Trakt = Trakt(self.trakt, authorization)
self.Trakt = Trakt(self, self.trakt, authorization)
except Failed as e:
logger.error(e)
logger.info(f"Trakt Connection {'Failed' if self.Trakt is None else 'Successful'}")
@ -266,7 +268,7 @@ class Config:
self.mal["client_secret"] = check_for_attribute(self.data, "client_secret", parent="mal", throw=True)
self.mal["config_path"] = self.config_path
authorization = self.data["mal"]["authorization"] if "authorization" in self.data["mal"] and self.data["mal"]["authorization"] else None
self.MyAnimeList = MyAnimeList(self.mal, self, authorization)
self.MyAnimeList = MyAnimeList(self, self.mal, authorization)
except Failed as e:
logger.error(e)
logger.info(f"My Anime List Connection {'Failed' if self.MyAnimeList is None else 'Successful'}")
@ -283,12 +285,12 @@ class Config:
try:
self.anidb["username"] = check_for_attribute(self.data, "username", parent="anidb", throw=True)
self.anidb["password"] = check_for_attribute(self.data, "password", parent="anidb", throw=True)
self.AniDB = AniDB(self.anidb, self)
self.AniDB = AniDB(self, self.anidb)
except Failed as e:
logger.error(e)
logger.info(f"My Anime List Connection {'Failed Continuing as Guest ' if self.MyAnimeList is None else 'Successful'}")
if self.AniDB is None:
self.AniDB = AniDB(None, self)
self.AniDB = AniDB(self, None)
self.TVDb = TVDb(self)
self.IMDb = IMDb(self)
@ -497,7 +499,7 @@ class Config:
radarr_params["quality_profile"] = check_for_attribute(lib, "quality_profile", parent="radarr", default=self.general["radarr"]["quality_profile"], req_default=True, save=False)
radarr_params["tag"] = check_for_attribute(lib, "tag", parent="radarr", var_type="lower_list", default=self.general["radarr"]["tag"], default_is_none=True, save=False)
radarr_params["search"] = check_for_attribute(lib, "search", parent="radarr", var_type="bool", default=self.general["radarr"]["search"], save=False)
library.Radarr = Radarr(radarr_params)
library.Radarr = Radarr(self, radarr_params)
except Failed as e:
util.print_stacktrace()
util.print_multiline(e, error=True)
@ -527,7 +529,7 @@ class Config:
sonarr_params["tag"] = check_for_attribute(lib, "tag", parent="sonarr", var_type="lower_list", default=self.general["sonarr"]["tag"], default_is_none=True, save=False)
sonarr_params["search"] = check_for_attribute(lib, "search", parent="sonarr", var_type="bool", default=self.general["sonarr"]["search"], save=False)
sonarr_params["cutoff_search"] = check_for_attribute(lib, "cutoff_search", parent="sonarr", var_type="bool", default=self.general["sonarr"]["cutoff_search"], save=False)
library.Sonarr = Sonarr(sonarr_params)
library.Sonarr = Sonarr(self, sonarr_params)
except Failed as e:
util.print_stacktrace()
util.print_multiline(e, error=True)
@ -544,7 +546,7 @@ class Config:
try:
tautulli_params["url"] = check_for_attribute(lib, "url", parent="tautulli", var_type="url", default=self.general["tautulli"]["url"], req_default=True, save=False)
tautulli_params["apikey"] = check_for_attribute(lib, "apikey", parent="tautulli", default=self.general["tautulli"]["apikey"], req_default=True, save=False)
library.Tautulli = Tautulli(tautulli_params)
library.Tautulli = Tautulli(self, tautulli_params)
except Failed as e:
util.print_stacktrace()
util.print_multiline(e, error=True)
@ -563,3 +565,22 @@ class Config:
util.separator()
def get_html(self, url, headers=None):
return html.fromstring(self.get(url, headers=headers).content)
def get_json(self, url, headers=None):
return self.get(url, headers=headers).json()
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def get(self, url, headers=None, params=None):
return self.session.get(url, headers=headers, params=params)
def post_html(self, url, data=None, json=None, headers=None):
return html.fromstring(self.post(url, data=data, json=json, headers=headers).content)
def post_json(self, url, data=None, json=None, headers=None):
return self.post(url, data=data, json=json, headers=headers).json()
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def post(self, url, data=None, json=None, headers=None):
return self.session.post(url, data=data, json=json, headers=headers)

@ -1,22 +1,17 @@
import logging, re, requests
from lxml import html
from modules import util
from modules.util import Failed
from plexapi.exceptions import BadRequest
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
arms_url = "https://relations.yuna.moe/api/ids"
anidb_url = "https://raw.githubusercontent.com/Anime-Lists/anime-lists/master/anime-list-master.xml"
class Convert:
def __init__(self, config):
self.config = config
self.arms_url = "https://relations.yuna.moe/api/ids"
self.anidb_url = "https://raw.githubusercontent.com/Anime-Lists/anime-lists/master/anime-list-master.xml"
self.AniDBIDs = self._get_anidb()
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _get_anidb(self):
return html.fromstring(requests.get(self.anidb_url).content)
self.AniDBIDs = self.config.get_html(anidb_url)
def _anidb(self, input_id, to_id, fail=False):
ids = self.AniDBIDs.xpath(f"//anime[contains(@anidbid, '{input_id}')]/@{to_id}")
@ -33,10 +28,6 @@ class Convert:
raise Failed(fail_text)
return [] if to_id == "imdbid" else None
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, ids):
return requests.post(self.arms_url, json=ids).json()
def _arms_ids(self, anilist_ids=None, anidb_ids=None, mal_ids=None):
all_ids = []
def collect_ids(ids, id_name):
@ -68,7 +59,7 @@ class Convert:
if len(unconverted_ids) > 0:
unconverted_id_sets.append(unconverted_ids)
for unconverted_id_set in unconverted_id_sets:
for anime_ids in self._request(unconverted_id_set):
for anime_ids in self.config.post_json(arms_url, json=unconverted_id_set):
if anime_ids:
if self.config.Cache:
self.config.Cache.update_anime_map(False, anime_ids)

@ -1,35 +1,31 @@
import logging, requests
from lxml import html
import logging
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
builders = ["icheckmovies_list", "icheckmovies_list_details"]
base_url = "https://www.icheckmovies.com/lists/"
class ICheckMovies:
def __init__(self, config):
self.config = config
self.list_url = "https://www.icheckmovies.com/lists/"
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url, language):
return html.fromstring(requests.get(url, headers={"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}).content)
def _request(self, url, language, xpath):
return self.config.get_html(url, headers=util.header(language)).xpath(xpath)
def _parse_list(self, list_url, language):
response = self._request(list_url, language)
imdb_urls = response.xpath("//a[@class='optionIcon optionIMDB external']/@href")
imdb_urls = self._request(list_url, language, "//a[@class='optionIcon optionIMDB external']/@href")
return [t[t.find("/tt") + 1:-1] for t in imdb_urls]
def get_list_description(self, list_url, language):
descriptions = self._request(list_url, language).xpath("//div[@class='span-19 last']/p/em/text()")
descriptions = self._request(list_url, language, "//div[@class='span-19 last']/p/em/text()")
return descriptions[0] if len(descriptions) > 0 and len(descriptions[0]) > 0 else None
def validate_icheckmovies_list(self, list_url, language):
list_url = list_url.strip()
if not list_url.startswith(self.list_url):
raise Failed(f"ICheckMovies Error: {list_url} must begin with: {self.list_url}")
if not list_url.startswith(base_url):
raise Failed(f"ICheckMovies Error: {list_url} must begin with: {base_url}")
if len(self._parse_list(list_url, language)) > 0:
return list_url
raise Failed(f"ICheckMovies Error: {list_url} failed to parse")

@ -1,45 +1,44 @@
import logging, math, re, requests
from lxml import html
import logging, math, re, time
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
builders = ["imdb_list", "imdb_id"]
base_url = "https://www.imdb.com"
urls = {
"list": f"{base_url}/list/ls",
"search": f"{base_url}/search/title/?",
"keyword": f"{base_url}/search/keyword/?"
}
class IMDb:
def __init__(self, config):
self.config = config
self.urls = {
"list": "https://www.imdb.com/list/ls",
"search": "https://www.imdb.com/search/title/?",
"keyword": "https://www.imdb.com/search/keyword/?"
}
def validate_imdb_url(self, imdb_url, language):
imdb_url = imdb_url.strip()
if not imdb_url.startswith(self.urls["list"]) and not imdb_url.startswith(self.urls["search"]) and not imdb_url.startswith(self.urls["keyword"]):
raise Failed(f"IMDb Error: {imdb_url} must begin with either:\n{self.urls['list']} (For Lists)\n{self.urls['search']} (For Searches)\n{self.urls['keyword']} (For Keyword Searches)")
if not imdb_url.startswith(urls["list"]) and not imdb_url.startswith(urls["search"]) and not imdb_url.startswith(urls["keyword"]):
raise Failed(f"IMDb Error: {imdb_url} must begin with either:\n{urls['list']} (For Lists)\n{urls['search']} (For Searches)\n{urls['keyword']} (For Keyword Searches)")
total, _ = self._total(self._fix_url(imdb_url), language)
if total > 0:
return imdb_url
raise Failed(f"IMDb Error: {imdb_url} failed to parse")
def _fix_url(self, imdb_url):
if imdb_url.startswith(self.urls["list"]):
if imdb_url.startswith(urls["list"]):
try: list_id = re.search("(\\d+)", str(imdb_url)).group(1)
except AttributeError: raise Failed(f"IMDb Error: Failed to parse List ID from {imdb_url}")
return f"{self.urls['search']}lists=ls{list_id}"
return f"{urls['search']}lists=ls{list_id}"
elif imdb_url.endswith("/"):
return imdb_url[:-1]
else:
return imdb_url
def _total(self, imdb_url, language):
header = {"Accept-Language": language}
if imdb_url.startswith(self.urls["keyword"]):
results = self._request(imdb_url, header).xpath("//div[@class='desc']/text()")
headers = util.header(language)
if imdb_url.startswith(urls["keyword"]):
results = self.config.get_html(imdb_url, headers=headers).xpath("//div[@class='desc']/text()")
total = None
for result in results:
if "title" in result:
@ -52,7 +51,7 @@ class IMDb:
raise Failed(f"IMDb Error: No Results at URL: {imdb_url}")
return total, 50
else:
try: results = self._request(imdb_url, header).xpath("//div[@class='desc']/span/text()")[0].replace(",", "")
try: results = self.config.get_html(imdb_url, headers=headers).xpath("//div[@class='desc']/span/text()")[0].replace(",", "")
except IndexError: raise Failed(f"IMDb Error: Failed to parse URL: {imdb_url}")
try: total = int(re.findall("(\\d+) title", results)[0])
except IndexError: raise Failed(f"IMDb Error: No Results at URL: {imdb_url}")
@ -61,7 +60,7 @@ class IMDb:
def _ids_from_url(self, imdb_url, language, limit):
current_url = self._fix_url(imdb_url)
total, item_count = self._total(current_url, language)
header = {"Accept-Language": language}
headers = util.header(language)
imdb_ids = []
if "&start=" in current_url: current_url = re.sub("&start=\\d+", "", current_url)
if "&count=" in current_url: current_url = re.sub("&count=\\d+", "", current_url)
@ -74,22 +73,19 @@ class IMDb:
for i in range(1, num_of_pages + 1):
start_num = (i - 1) * item_count + 1
util.print_return(f"Parsing Page {i}/{num_of_pages} {start_num}-{limit if i == num_of_pages else i * item_count}")
if imdb_url.startswith(self.urls["keyword"]):
response = self._request(f"{current_url}&page={i}", header)
if imdb_url.startswith(urls["keyword"]):
response = self.config.get_html(f"{current_url}&page={i}", headers=headers)
else:
response = self._request(f"{current_url}&count={remainder if i == num_of_pages else item_count}&start={start_num}", header)
if imdb_url.startswith(self.urls["keyword"]) and i == num_of_pages:
response = self.config.get_html(f"{current_url}&count={remainder if i == num_of_pages else item_count}&start={start_num}", headers=headers)
if imdb_url.startswith(urls["keyword"]) and i == num_of_pages:
imdb_ids.extend(response.xpath("//div[contains(@class, 'lister-item-image')]//a/img//@data-tconst")[:remainder])
else:
imdb_ids.extend(response.xpath("//div[contains(@class, 'lister-item-image')]//a/img//@data-tconst"))
time.sleep(2)
util.print_end()
if imdb_ids: return imdb_ids
else: raise Failed(f"IMDb Error: No IMDb IDs Found at {imdb_url}")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url, header):
return html.fromstring(requests.get(url, headers=header).content)
def get_items(self, method, data, language, is_movie):
pretty = util.pretty_names[method] if method in util.pretty_names else method
show_ids = []

@ -1,24 +1,18 @@
import logging, requests
from lxml import html
import logging, time
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
builders = ["letterboxd_list", "letterboxd_list_details"]
base_url = "https://letterboxd.com"
class Letterboxd:
def __init__(self, config):
self.config = config
self.url = "https://letterboxd.com"
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url, language):
return html.fromstring(requests.get(url, headers={"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}).content)
def _parse_list(self, list_url, language):
response = self._request(list_url, language)
response = self.config.get_html(list_url, headers=util.header(language))
letterboxd_ids = response.xpath("//div[@class='poster film-poster really-lazy-load']/@data-film-id")
items = []
for letterboxd_id in letterboxd_ids:
@ -26,11 +20,12 @@ class Letterboxd:
items.append((letterboxd_id, slugs[0]))
next_url = response.xpath("//a[@class='next']/@href")
if len(next_url) > 0:
items.extend(self._parse_list(f"{self.url}{next_url[0]}", language))
time.sleep(2)
items.extend(self._parse_list(f"{base_url}{next_url[0]}", language))
return items
def _tmdb(self, letterboxd_url, language):
response = self._request(letterboxd_url, language)
response = self.config.get_html(letterboxd_url, headers=util.header(language))
ids = response.xpath("//a[@data-track-action='TMDb']/@href")
if len(ids) > 0 and ids[0]:
if "themoviedb.org/movie" in ids[0]:
@ -39,7 +34,8 @@ class Letterboxd:
raise Failed(f"Letterboxd Error: TMDb Movie ID not found at {letterboxd_url}")
def get_list_description(self, list_url, language):
descriptions = self._request(list_url, language).xpath("//meta[@property='og:description']/@content")
response = self.config.get_html(list_url, headers=util.header(language))
descriptions = response.xpath("//meta[@property='og:description']/@content")
return descriptions[0] if len(descriptions) > 0 and len(descriptions[0]) > 0 else None
def get_items(self, method, data, language):
@ -58,7 +54,7 @@ class Letterboxd:
tmdb_id, expired = self.config.Cache.query_letterboxd_map(letterboxd_id)
if not tmdb_id or expired is not False:
try:
tmdb_id = self._tmdb(f"{self.url}{slug}", language)
tmdb_id = self._tmdb(f"{base_url}{slug}", language)
except Failed as e:
logger.error(e)
continue

@ -1,7 +1,6 @@
import logging, re, requests, secrets, webbrowser
import logging, re, secrets, webbrowser
from modules import util
from modules.util import Failed, TimeoutExpired
from retrying import retry
from ruamel import yaml
logger = logging.getLogger("Plex Meta Manager")
@ -71,18 +70,17 @@ userlist_status = [
"dropped",
"plan_to_watch"
]
urls = {
"oauth_token": "https://myanimelist.net/v1/oauth2/token",
"oauth_authorize": "https://myanimelist.net/v1/oauth2/authorize",
"ranking": "https://api.myanimelist.net/v2/anime/ranking",
"season": "https://api.myanimelist.net/v2/anime/season",
"suggestions": "https://api.myanimelist.net/v2/anime/suggestions",
"user": "https://api.myanimelist.net/v2/users"
}
class MyAnimeList:
def __init__(self, params, config, authorization=None):
def __init__(self, config, params, authorization=None):
self.config = config
self.urls = {
"oauth_token": "https://myanimelist.net/v1/oauth2/token",
"oauth_authorize": "https://myanimelist.net/v1/oauth2/authorize",
"ranking": "https://api.myanimelist.net/v2/anime/ranking",
"season": "https://api.myanimelist.net/v2/anime/season",
"suggestions": "https://api.myanimelist.net/v2/anime/suggestions",
"user": "https://api.myanimelist.net/v2/users"
}
self.client_id = params["client_id"]
self.client_secret = params["client_secret"]
self.config_path = params["config_path"]
@ -93,7 +91,7 @@ class MyAnimeList:
def _authorization(self):
code_verifier = secrets.token_urlsafe(100)[:128]
url = f"{self.urls['oauth_authorize']}?response_type=code&client_id={self.client_id}&code_challenge={code_verifier}"
url = f"{urls['oauth_authorize']}?response_type=code&client_id={self.client_id}&code_challenge={code_verifier}"
logger.info("")
logger.info(f"Navigate to: {url}")
logger.info("")
@ -122,7 +120,7 @@ class MyAnimeList:
def _check(self, authorization):
try:
self._request(self.urls["suggestions"], authorization=authorization)
self._request(urls["suggestions"], authorization=authorization)
return True
except Failed as e:
logger.debug(e)
@ -158,14 +156,12 @@ class MyAnimeList:
return True
return False
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _oauth(self, data):
return requests.post(self.urls["oauth_token"], data).json()
return self.config.post_json(urls["oauth_token"], data=data)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def _request(self, url, authorization=None):
new_authorization = authorization if authorization else self.authorization
response = requests.get(url, headers={"Authorization": f"Bearer {new_authorization['access_token']}"}).json()
response = self.config.get_json(url, headers={"Authorization": f"Bearer {new_authorization['access_token']}"})
if "error" in response: raise Failed(f"MyAnimeList Error: {response['error']}")
else: return response
@ -174,23 +170,23 @@ class MyAnimeList:
return [d["node"]["id"] for d in data["data"]] if "data" in data else []
def _username(self):
return self._request(f"{self.urls['user']}/@me")["name"]
return self._request(f"{urls['user']}/@me")["name"]
def _ranked(self, ranking_type, limit):
url = f"{self.urls['ranking']}?ranking_type={ranking_type}&limit={limit}"
url = f"{urls['ranking']}?ranking_type={ranking_type}&limit={limit}"
return self._parse_request(url)
def _season(self, season, year, sort_by, limit):
url = f"{self.urls['season']}/{year}/{season}?sort={sort_by}&limit={limit}"
url = f"{urls['season']}/{year}/{season}?sort={sort_by}&limit={limit}"
return self._parse_request(url)
def _suggestions(self, limit):
url = f"{self.urls['suggestions']}?limit={limit}"
url = f"{urls['suggestions']}?limit={limit}"
return self._parse_request(url)
def _userlist(self, username, status, sort_by, limit):
final_status = "" if status == "all" else f"status={status}&"
url = f"{self.urls['user']}/{username}/animelist?{final_status}sort={sort_by}&limit={limit}"
url = f"{urls['user']}/{username}/animelist?{final_status}sort={sort_by}&limit={limit}"
return self._parse_request(url)
def get_items(self, method, data):

@ -1,4 +1,4 @@
import logging, os, re, requests
import logging, os, re
from datetime import datetime
from modules import plex, util
from modules.util import Failed, ImageData
@ -7,13 +7,14 @@ from ruamel import yaml
logger = logging.getLogger("Plex Meta Manager")
github_base = "https://raw.githubusercontent.com/meisnate12/Plex-Meta-Manager-Configs/master/"
class Metadata:
def __init__(self, config, library, file_type, path):
self.config = config
self.library = library
self.type = file_type
self.path = path
self.github_base = "https://raw.githubusercontent.com/meisnate12/Plex-Meta-Manager-Configs/master/"
logger.info("")
logger.info(f"Loading Metadata {file_type}: {path}")
def get_dict(attribute, attr_data, check_list=None):
@ -37,8 +38,8 @@ class Metadata:
return None
try:
if file_type in ["URL", "Git"]:
content_path = path if file_type == "URL" else f"{self.github_base}{path}.yml"
response = requests.get(content_path)
content_path = path if file_type == "URL" else f"{github_base}{path}.yml"
response = self.config.get(content_path)
if response.status_code >= 400:
raise Failed(f"URL Error: No file found at {content_path}")
content = response.content

@ -1,10 +1,11 @@
import logging, requests
import logging
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
base_url = "http://www.omdbapi.com/"
class OMDbObj:
def __init__(self, imdb_id, data):
self._imdb_id = imdb_id
@ -35,25 +36,23 @@ class OMDbObj:
self.type = data["Type"]
class OMDb:
def __init__(self, params, Cache=None):
self.url = "http://www.omdbapi.com/"
def __init__(self, config, params):
self.config = config
self.apikey = params["apikey"]
self.limit = False
self.Cache = Cache
self.get_omdb("tt0080684")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_omdb(self, imdb_id):
expired = None
if self.Cache:
omdb_dict, expired = self.Cache.query_omdb(imdb_id)
if self.config.Cache:
omdb_dict, expired = self.config.Cache.query_omdb(imdb_id)
if omdb_dict and expired is False:
return OMDbObj(imdb_id, omdb_dict)
response = requests.get(self.url, params={"i": imdb_id, "apikey": self.apikey})
response = self.config.get(base_url, params={"i": imdb_id, "apikey": self.apikey})
if response.status_code < 400:
omdb = OMDbObj(imdb_id, response.json())
if self.Cache:
self.Cache.update_omdb(expired, omdb)
if self.config.Cache:
self.config.Cache.update_omdb(expired, omdb)
return omdb
else:
error = response.json()['Error']

@ -255,8 +255,12 @@ sort_types = {
class Plex:
def __init__(self, config, params):
self.config = config
self.plex = params["plex"]
self.url = params["plex"]["url"]
self.token = params["plex"]["token"]
self.timeout = params["plex"]["timeout"]
try:
self.PlexServer = PlexServer(params["plex"]["url"], params["plex"]["token"], timeout=params["plex"]["timeout"])
self.PlexServer = PlexServer(baseurl=self.url, token=self.token, session=self.config.session, timeout=self.timeout)
except Unauthorized:
raise Failed("Plex Error: Plex token is invalid")
except ValueError as e:
@ -322,10 +326,6 @@ class Plex:
self.radarr_add_all = params["radarr_add_all"]
self.sonarr_add_all = params["sonarr_add_all"]
self.mass_update = self.mass_genre_update or self.mass_audience_rating_update or self.mass_critic_rating_update or self.split_duplicates or self.radarr_add_all or self.sonarr_add_all
self.plex = params["plex"]
self.url = params["plex"]["url"]
self.token = params["plex"]["token"]
self.timeout = params["plex"]["timeout"]
self.clean_bundles = params["plex"]["clean_bundles"]
self.empty_trash = params["plex"]["empty_trash"]
self.optimize = params["plex"]["optimize"]
@ -427,7 +427,7 @@ class Plex:
self.reload(item)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_plex)
def _upload_file_poster(self, item, image):
def upload_file_poster(self, item, image):
item.uploadPoster(filepath=image)
self.reload(item)
@ -470,7 +470,7 @@ class Plex:
new_poster = new_poster.resize(overlay_image.size, Image.ANTIALIAS)
new_poster.paste(overlay_image, (0, 0), overlay_image)
new_poster.save(temp_image)
self._upload_file_poster(item, temp_image)
self.upload_file_poster(item, temp_image)
poster_uploaded = True
logger.info(f"Detail: Overlay: {overlay_name} applied to {item.title}")
@ -772,9 +772,9 @@ class Plex:
name = os.path.basename(os.path.dirname(str(item.locations[0])) if self.is_movie else str(item.locations[0]))
logger.debug(name)
found_folder = False
poster = None
background = None
for ad in self.asset_directory:
poster = None
background = None
item_dir = None
if self.asset_folders:
if os.path.isdir(os.path.join(ad, name)):

@ -19,7 +19,8 @@ apply_tags_translation = {
}
class Radarr:
def __init__(self, params):
def __init__(self, config, params):
self.config = config
self.url = params["url"]
self.token = params["token"]
try:
@ -83,4 +84,3 @@ class Radarr:
logger.info("")
for tmdb_id in not_exists:
logger.info(f"TMDb ID Not in Radarr | {tmdb_id}")

@ -24,7 +24,8 @@ apply_tags_translation = {
}
class Sonarr:
def __init__(self, params):
def __init__(self, config, params):
self.config = config
self.url = params["url"]
self.token = params["token"]
try:

@ -1,15 +1,15 @@
import logging, requests
import logging
from modules import util
from modules.util import Failed
from plexapi.exceptions import BadRequest, NotFound
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
builders = ["tautulli_popular", "tautulli_watched"]
class Tautulli:
def __init__(self, params):
def __init__(self, config, params):
self.config = config
self.url = params["url"]
self.apikey = params["apikey"]
try:
@ -62,7 +62,6 @@ class Tautulli:
if section_id: return section_id
else: raise Failed(f"Tautulli Error: No Library named {library_name} in the response")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url):
logger.debug(f"Tautulli URL: {url.replace(self.apikey, '###############')}")
return requests.get(url).json()
return self.config.get_json(url)

@ -24,7 +24,8 @@ builders = [
]
class Trakt:
def __init__(self, params, authorization=None):
def __init__(self, config, params, authorization=None):
self.config = config
self.base_url = "https://api.trakt.tv"
self.redirect_uri = "urn:ietf:wg:oauth:2.0:oob"
self.aliases = {
@ -118,9 +119,8 @@ class Trakt:
if trakt_list is None: raise Failed("Trakt Error: No List found")
else: return trakt_list
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url):
return requests.get(url, headers={"Content-Type": "application/json", "trakt-api-version": "2", "trakt-api-key": self.client_id}).json()
return self.config.get_json(url, headers={"Content-Type": "application/json", "trakt-api-version": "2", "trakt-api-key": self.client_id})
def _collection(self, username, is_movie):
items = self._request(f"{self.base_url}/users/{username}/collection/{'movies' if is_movie else 'shows'}")

@ -1,8 +1,6 @@
import logging, requests
from lxml import html
import logging, requests, time
from modules import util
from modules.util import Failed
from retrying import retry
logger = logging.getLogger("Plex Meta Manager")
@ -14,33 +12,48 @@ builders = [
"tvdb_show",
"tvdb_show_details"
]
base_url = "https://www.thetvdb.com"
alt_url = "https://thetvdb.com"
urls = {
"list": f"{base_url}/lists/",
"alt_list": f"{alt_url}/lists/",
"series": f"{base_url}/series/",
"alt_series": f"{alt_url}/series/",
"movies": f"{base_url}/movies/",
"alt_movies": f"{alt_url}/movies/",
"series_id": f"{base_url}/dereferrer/series/",
"movie_id": f"{base_url}/dereferrer/movie/"
}
class TVDbObj:
def __init__(self, tvdb_url, language, is_movie, TVDb):
tvdb_url = tvdb_url.strip()
if not is_movie and tvdb_url.startswith((TVDb.series_url, TVDb.alt_series_url, TVDb.series_id_url)):
def __init__(self, tvdb_url, language, is_movie, config):
self.tvdb_url = tvdb_url.strip()
self.language = language
self.is_movie = is_movie
self.config = config
if not self.is_movie and self.tvdb_url.startswith((urls["series"], urls["alt_series"], urls["series_id"])):
self.media_type = "Series"
elif is_movie and tvdb_url.startswith((TVDb.movies_url, TVDb.alt_movies_url, TVDb.movie_id_url)):
elif self.is_movie and self.tvdb_url.startswith((urls["movies"], urls["alt_movies"], urls["movie_id"])):
self.media_type = "Movie"
else:
raise Failed(f"TVDb Error: {tvdb_url} must begin with {TVDb.movies_url if is_movie else TVDb.series_url}")
raise Failed(f"TVDb Error: {self.tvdb_url} must begin with {urls['movies'] if self.is_movie else urls['series']}")
response = TVDb._request(tvdb_url, language)
response = self.config.get_html(self.tvdb_url, headers=util.header(self.language))
results = response.xpath(f"//*[text()='TheTVDB.com {self.media_type} ID']/parent::node()/span/text()")
if len(results) > 0:
self.id = int(results[0])
elif tvdb_url.startswith(TVDb.movie_id_url):
raise Failed(f"TVDb Error: Could not find a TVDb Movie using TVDb Movie ID: {tvdb_url[len(TVDb.movie_id_url):]}")
elif tvdb_url.startswith(TVDb.series_id_url):
raise Failed(f"TVDb Error: Could not find a TVDb Series using TVDb Series ID: {tvdb_url[len(TVDb.series_id_url):]}")
elif self.tvdb_url.startswith(urls["movie_id"]):
raise Failed(f"TVDb Error: Could not find a TVDb Movie using TVDb Movie ID: {self.tvdb_url[len(urls['movie_id']):]}")
elif self.tvdb_url.startswith(urls["series_id"]):
raise Failed(f"TVDb Error: Could not find a TVDb Series using TVDb Series ID: {self.tvdb_url[len(urls['series_id']):]}")
else:
raise Failed(f"TVDb Error: Could not find a TVDb {self.media_type} ID at the URL {tvdb_url}")
raise Failed(f"TVDb Error: Could not find a TVDb {self.media_type} ID at the URL {self.tvdb_url}")
results = response.xpath("//div[@class='change_translation_text' and @data-language='eng']/@data-title")
if len(results) > 0 and len(results[0]) > 0:
self.title = results[0]
else:
raise Failed(f"TVDb Error: Name not found from TVDb URL: {tvdb_url}")
raise Failed(f"TVDb Error: Name not found from TVDb URL: {self.tvdb_url}")
results = response.xpath("//div[@class='row hidden-xs hidden-sm']/div/img/@src")
self.poster_path = results[0] if len(results) > 0 and len(results[0]) > 0 else None
@ -52,7 +65,7 @@ class TVDbObj:
self.summary = results[0] if len(results) > 0 and len(results[0]) > 0 else None
tmdb_id = None
if is_movie:
if self.is_movie:
results = response.xpath("//*[text()='TheMovieDB.com']/@href")
if len(results) > 0:
try:
@ -63,70 +76,58 @@ class TVDbObj:
results = response.xpath("//*[text()='IMDB']/@href")
if len(results) > 0:
try:
tmdb_id = TVDb.config.Convert.imdb_to_tmdb(util.get_id_from_imdb_url(results[0]), fail=True)
tmdb_id = self.config.Convert.imdb_to_tmdb(util.get_id_from_imdb_url(results[0]), fail=True)
except Failed:
pass
if tmdb_id is None:
raise Failed(f"TVDB Error: No TMDb ID found for {self.title}")
self.tmdb_id = tmdb_id
self.tvdb_url = tvdb_url
self.language = language
self.is_movie = is_movie
self.TVDb = TVDb
class TVDb:
def __init__(self, config):
self.config = config
self.site_url = "https://www.thetvdb.com"
self.alt_site_url = "https://thetvdb.com"
self.list_url = f"{self.site_url}/lists/"
self.alt_list_url = f"{self.alt_site_url}/lists/"
self.series_url = f"{self.site_url}/series/"
self.alt_series_url = f"{self.alt_site_url}/series/"
self.movies_url = f"{self.site_url}/movies/"
self.alt_movies_url = f"{self.alt_site_url}/movies/"
self.series_id_url = f"{self.site_url}/dereferrer/series/"
self.movie_id_url = f"{self.site_url}/dereferrer/movie/"
def get_movie_or_series(self, language, tvdb_url, is_movie):
return self.get_movie(language, tvdb_url) if is_movie else self.get_series(language, tvdb_url)
def get_series(self, language, tvdb_url):
try:
tvdb_url = f"{self.series_id_url}{int(tvdb_url)}"
tvdb_url = f"{urls['series_id']}{int(tvdb_url)}"
except ValueError:
pass
return TVDbObj(tvdb_url, language, False, self)
return TVDbObj(tvdb_url, language, False, self.config)
def get_movie(self, language, tvdb_url):
try:
tvdb_url = f"{self.movie_id_url}{int(tvdb_url)}"
tvdb_url = f"{urls['movie_id']}{int(tvdb_url)}"
except ValueError:
pass
return TVDbObj(tvdb_url, language, True, self)
return TVDbObj(tvdb_url, language, True, self.config)
def get_list_description(self, tvdb_url, language):
description = self._request(tvdb_url, language).xpath("//div[@class='block']/div[not(@style='display:none')]/p/text()")
response = self.config.get_html(tvdb_url, headers=util.header(language))
description = response.xpath("//div[@class='block']/div[not(@style='display:none')]/p/text()")
return description[0] if len(description) > 0 and len(description[0]) > 0 else ""
def _ids_from_url(self, tvdb_url, language):
show_ids = []
movie_ids = []
tvdb_url = tvdb_url.strip()
if tvdb_url.startswith((self.list_url, self.alt_list_url)):
if tvdb_url.startswith((urls["list"], urls["alt_list"])):
try:
items = self._request(tvdb_url, language).xpath("//div[@class='col-xs-12 col-sm-12 col-md-8 col-lg-8 col-md-pull-4']/div[@class='row']")
response = self.config.get_html(tvdb_url, headers=util.header(language))
items = response.xpath("//div[@class='col-xs-12 col-sm-12 col-md-8 col-lg-8 col-md-pull-4']/div[@class='row']")
for item in items:
title = item.xpath(".//div[@class='col-xs-12 col-sm-9 mt-2']//a/text()")[0]
item_url = item.xpath(".//div[@class='col-xs-12 col-sm-9 mt-2']//a/@href")[0]
if item_url.startswith("/series/"):
try:
show_ids.append(self.get_series(language, f"{self.site_url}{item_url}").id)
show_ids.append(self.get_series(language, f"{base_url}{item_url}").id)
except Failed as e:
logger.error(f"{e} for series {title}")
elif item_url.startswith("/movies/"):
try:
tmdb_id = self.get_movie(language, f"{self.site_url}{item_url}").tmdb_id
tmdb_id = self.get_movie(language, f"{base_url}{item_url}").tmdb_id
if tmdb_id:
movie_ids.append(tmdb_id)
else:
@ -135,6 +136,7 @@ class TVDb:
logger.error(f"{e} for series {title}")
else:
logger.error(f"TVDb Error: Skipping Movie: {title}")
time.sleep(2)
if len(show_ids) > 0 or len(movie_ids) > 0:
return movie_ids, show_ids
raise Failed(f"TVDb Error: No TVDb IDs found at {tvdb_url}")
@ -142,11 +144,7 @@ class TVDb:
util.print_stacktrace()
raise Failed(f"TVDb Error: URL Lookup Failed for {tvdb_url}")
else:
raise Failed(f"TVDb Error: {tvdb_url} must begin with {self.list_url}")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def _request(self, url, language):
return html.fromstring(requests.get(url, headers={"Accept-Language": language}).content)
raise Failed(f"TVDb Error: {tvdb_url} must begin with {urls['list']}")
def get_items(self, method, data, language):
pretty = util.pretty_names[method] if method in util.pretty_names else method

@ -280,6 +280,9 @@ def logger_input(prompt, timeout=60):
elif hasattr(signal, "SIGALRM"): return unix_input(prompt, timeout)
else: raise SystemError("Input Timeout not supported on this system")
def header(language="en-US,en;q=0.5"):
return {"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}
def alarm_handler(signum, frame):
raise TimeoutExpired

Loading…
Cancel
Save