variable rename

pull/240/head
meisnate12 3 years ago
parent fa1b7f79f2
commit 4cd14dbfc6

@ -18,29 +18,29 @@ class AniDBAPI:
}
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url, language):
def _request(self, url, language):
return html.fromstring(requests.get(url, headers={"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}).content)
def get_popular(self, language):
response = self.send_request(self.urls["popular"], language)
def _popular(self, language):
response = self._request(self.urls["popular"], language)
return util.get_int_list(response.xpath("//td[@class='name anime']/a/@href"), "AniDB ID")
def validate_anidb_id(self, anidb_id, language):
response = self.send_request(f"{self.urls['anime']}/{anidb_id}", language)
def _relations(self, anidb_id, language):
response = self._request(f"{self.urls['anime']}/{anidb_id}{self.urls['relation']}", language)
return util.get_int_list(response.xpath("//area/@href"), "AniDB ID")
def _validate(self, anidb_id, language):
response = self._request(f"{self.urls['anime']}/{anidb_id}", language)
ids = response.xpath(f"//*[text()='a{anidb_id}']/text()")
if len(ids) > 0:
return util.regex_first_int(ids[0], "AniDB ID")
raise Failed(f"AniDB Error: AniDB ID: {anidb_id} not found")
def get_anidb_relations(self, anidb_id, language):
response = self.send_request(f"{self.urls['anime']}/{anidb_id}{self.urls['relation']}", language)
return util.get_int_list(response.xpath("//area/@href"), "AniDB ID")
def validate_anidb_list(self, anidb_list, language):
anidb_values = []
for anidb_id in anidb_list:
try:
anidb_values.append(self.validate_anidb_id(anidb_id, language))
anidb_values.append(self._validate(anidb_id, language))
except Failed as e:
logger.error(e)
if len(anidb_values) > 0:
@ -55,11 +55,11 @@ class AniDBAPI:
if method == "anidb_popular":
if status_message:
logger.info(f"Processing {pretty}: {data} Anime")
anidb_ids.extend(self.get_popular(language)[:data])
anidb_ids.extend(self._popular(language)[:data])
else:
if status_message: logger.info(f"Processing {pretty}: {data}")
if method == "anidb_id": anidb_ids.append(data)
elif method == "anidb_relation": anidb_ids.extend(self.get_anidb_relations(data, language))
elif method == "anidb_relation": anidb_ids.extend(self._relations(data, language))
else: raise Failed(f"AniDB Error: Method {method} not supported")
movie_ids, show_ids = self.config.Arms.anidb_to_ids(anidb_ids, language)
if status_message:

@ -19,6 +19,8 @@ pretty_names = {
"score": "Average Score",
"popular": "Popularity"
}
tag_query = "query{MediaTagCollection {name}}"
genre_query = "query{GenreCollection}"
class AniListAPI:
def __init__(self, config):
@ -26,19 +28,12 @@ class AniListAPI:
self.url = "https://graphql.anilist.co"
self.tags = {}
self.genres = {}
for tag in self.send_request("query{MediaTagCollection {name}}", {})["data"]["MediaTagCollection"]:
self.tags[tag["name"].lower()] = tag["name"]
for genre in self.send_request("query{GenreCollection}", {})["data"]["GenreCollection"]:
self.genres[genre.lower()] = genre
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def post(self, query, variables):
return requests.post(self.url, json={"query": query, "variables": variables})
self.tags = {t["name"].lower(): t["name"] for t in self._request(tag_query, {})["data"]["MediaTagCollection"]}
self.genres = {g.lower(): g for g in self._request(genre_query, {})["data"]["GenreCollection"]}
@retry(stop_max_attempt_number=2, retry_on_exception=util.retry_if_not_failed)
def send_request(self, query, variables):
response = self.post(query, variables)
def _request(self, query, variables):
response = requests.post(self.url, json={"query": query, "variables": variables})
json_obj = response.json()
if "errors" in json_obj:
if json_obj['errors'][0]['message'] == "Too Many Requests.":
@ -51,14 +46,14 @@ class AniListAPI:
time.sleep(0.4)
return json_obj
def anilist_id(self, anilist_id):
def _validate(self, anilist_id):
query = "query ($id: Int) {Media(id: $id) {id title{romaji english}}}"
media = self.send_request(query, {"id": anilist_id})["data"]["Media"]
media = self._request(query, {"id": anilist_id})["data"]["Media"]
if media["id"]:
return media["id"], media["title"]["english" if media["title"]["english"] else "romaji"]
raise Failed(f"AniList Error: No AniList ID found for {anilist_id}")
def get_pagenation(self, query, limit=0, variables=None):
def _pagenation(self, query, limit=0, variables=None):
anilist_ids = []
count = 0
page_num = 0
@ -68,7 +63,7 @@ class AniListAPI:
while next_page:
page_num += 1
variables["page"] = page_num
json_obj = self.send_request(query, variables)
json_obj = self._request(query, variables)
next_page = json_obj["data"]["Page"]["pageInfo"]["hasNextPage"]
for media in json_obj["data"]["Page"]["media"]:
if media["id"]:
@ -80,7 +75,7 @@ class AniListAPI:
break
return anilist_ids
def top_rated(self, limit):
def _top_rated(self, limit):
query = """
query ($page: Int) {
Page(page: $page) {
@ -89,9 +84,9 @@ class AniListAPI:
}
}
"""
return self.get_pagenation(query, limit=limit)
return self._pagenation(query, limit=limit)
def popular(self, limit):
def _popular(self, limit):
query = """
query ($page: Int) {
Page(page: $page) {
@ -100,9 +95,9 @@ class AniListAPI:
}
}
"""
return self.get_pagenation(query, limit=limit)
return self._pagenation(query, limit=limit)
def season(self, season, year, sort, limit):
def _season(self, season, year, sort, limit):
query = """
query ($page: Int, $season: MediaSeason, $year: Int, $sort: [MediaSort]) {
Page(page: $page){
@ -112,9 +107,9 @@ class AniListAPI:
}
"""
variables = {"season": season.upper(), "year": year, "sort": "SCORE_DESC" if sort == "score" else "POPULARITY_DESC"}
return self.get_pagenation(query, limit=limit, variables=variables)
return self._pagenation(query, limit=limit, variables=variables)
def genre(self, genre, sort, limit):
def _genre(self, genre, sort, limit):
query = """
query ($page: Int, $genre: String, $sort: [MediaSort]) {
Page(page: $page){
@ -124,9 +119,9 @@ class AniListAPI:
}
"""
variables = {"genre": genre, "sort": "SCORE_DESC" if sort == "score" else "POPULARITY_DESC"}
return self.get_pagenation(query, limit=limit, variables=variables)
return self._pagenation(query, limit=limit, variables=variables)
def tag(self, tag, sort, limit):
def _tag(self, tag, sort, limit):
query = """
query ($page: Int, $tag: String, $sort: [MediaSort]) {
Page(page: $page){
@ -136,9 +131,9 @@ class AniListAPI:
}
"""
variables = {"tag": tag, "sort": "SCORE_DESC" if sort == "score" else "POPULARITY_DESC"}
return self.get_pagenation(query, limit=limit, variables=variables)
return self._pagenation(query, limit=limit, variables=variables)
def studio(self, studio_id):
def _studio(self, studio_id):
query = """
query ($page: Int, $id: Int) {
Studio(id: $id) {
@ -156,7 +151,7 @@ class AniListAPI:
name = None
while next_page:
page_num += 1
json_obj = self.send_request(query, {"id": studio_id, "page": page_num})
json_obj = self._request(query, {"id": studio_id, "page": page_num})
if not name:
name = json_obj["data"]["Studio"]["name"]
next_page = json_obj["data"]["Studio"]["media"]["pageInfo"]["hasNextPage"]
@ -165,7 +160,7 @@ class AniListAPI:
anilist_ids.append(media["id"])
return anilist_ids, name
def relations(self, anilist_id, ignore_ids=None):
def _relations(self, anilist_id, ignore_ids=None):
query = """
query ($id: Int) {
Media(id: $id) {
@ -182,9 +177,9 @@ class AniListAPI:
name = ""
if not ignore_ids:
ignore_ids = [anilist_id]
anilist_id, name = self.anilist_id(anilist_id)
anilist_id, name = self._validate(anilist_id)
anilist_ids.append(anilist_id)
json_obj = self.send_request(query, {"id": anilist_id})
json_obj = self._request(query, {"id": anilist_id})
edges = [media["node"]["id"] for media in json_obj["data"]["Media"]["relations"]["edges"]
if media["relationType"] not in ["CHARACTER", "OTHER"] and media["node"]["type"] == "ANIME"]
for media in json_obj["data"]["Media"]["relations"]["nodes"]:
@ -194,7 +189,7 @@ class AniListAPI:
anilist_ids.append(media["id"])
for next_id in new_anilist_ids:
new_relation_ids, ignore_ids, _ = self.relations(next_id, ignore_ids=ignore_ids)
new_relation_ids, ignore_ids, _ = self._relations(next_id, ignore_ids=ignore_ids)
anilist_ids.extend(new_relation_ids)
return anilist_ids, ignore_ids, name
@ -215,7 +210,7 @@ class AniListAPI:
if studio: query = "query ($id: Int) {Studio(id: $id) {name}}"
else: query = "query ($id: Int) {Media(id: $id) {id}}"
try:
self.send_request(query, {"id": anilist_id})
self._request(query, {"id": anilist_id})
anilist_values.append(anilist_id)
except Failed as e: logger.error(e)
if len(anilist_values) > 0:
@ -227,29 +222,29 @@ class AniListAPI:
logger.debug(f"Data: {data}")
pretty = util.pretty_names[method] if method in util.pretty_names else method
if method == "anilist_id":
anilist_id, name = self.anilist_id(data)
anilist_id, name = self._validate(data)
anilist_ids = [anilist_id]
if status_message:
logger.info(f"Processing {pretty}: ({data}) {name}")
elif method in ["anilist_popular", "anilist_top_rated"]:
anilist_ids = self.popular(data) if method == "anilist_popular" else self.top_rated(data)
anilist_ids = self._popular(data) if method == "anilist_popular" else self._top_rated(data)
if status_message:
logger.info(f"Processing {pretty}: {data} Anime")
elif method == "anilist_season":
anilist_ids = self.season(data["season"], data["year"], data["sort_by"], data["limit"])
anilist_ids = self._season(data["season"], data["year"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit'] if data['limit'] > 0 else 'All'} Anime from {util.pretty_seasons[data['season']]} {data['year']} sorted by {pretty_names[data['sort_by']]}")
elif method == "anilist_genre":
anilist_ids = self.genre(data["genre"], data["sort_by"], data["limit"])
anilist_ids = self._genre(data["genre"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit'] if data['limit'] > 0 else 'All'} Anime from the Genre: {data['genre']} sorted by {pretty_names[data['sort_by']]}")
elif method == "anilist_tag":
anilist_ids = self.tag(data["tag"], data["sort_by"], data["limit"])
anilist_ids = self._tag(data["tag"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit'] if data['limit'] > 0 else 'All'} Anime from the Tag: {data['tag']} sorted by {pretty_names[data['sort_by']]}")
elif method in ["anilist_studio", "anilist_relations"]:
if method == "anilist_studio": anilist_ids, name = self.studio(data)
else: anilist_ids, _, name = self.relations(data)
if method == "anilist_studio": anilist_ids, name = self._studio(data)
else: anilist_ids, _, name = self._relations(data)
if status_message:
logger.info(f"Processing {pretty}: ({data}) {name} ({len(anilist_ids)} Anime)")
else:

@ -163,7 +163,7 @@ movie_only_filters = [
"writer", "writer.not"
]
def split_attribute(text):
def _split(text):
attribute, modifier = os.path.splitext(str(text).lower())
attribute = method_alias[attribute] if attribute in method_alias else attribute
modifier = modifier_alias[modifier] if modifier in modifier_alias else modifier
@ -463,7 +463,7 @@ class CollectionBuilder:
indent = f"\n{' ' * level}"
conjunction = f"{'and' if is_all else 'or'}=1&"
for smart_key, smart_data in filter_dict.items():
smart, smart_mod, smart_final = split_attribute(smart_key)
smart, smart_mod, smart_final = _split(smart_key)
def build_url_arg(arg, mod=None, arg_s=None, mod_s=None):
arg_key = plex.search_translation[smart] if smart in plex.search_translation else smart
@ -884,7 +884,7 @@ class CollectionBuilder:
elif method_name == "plex_search":
searches = {}
for search_name, search_data in method_data.items():
search, modifier, search_final = split_attribute(search_name)
search, modifier, search_final = _split(search_name)
if search_name != search_final:
logger.warning(f"Collection Warning: {search_name} plex search attribute will run as {search_final}")
if search_final in plex.movie_only_searches and self.library.is_show:

@ -74,26 +74,26 @@ class Cache:
return tmdb_id, tvdb_id
def get_tmdb_id(self, media_type, plex_guid=None, imdb_id=None, tvdb_id=None, anidb_id=None):
return self.get_id_from(media_type, "tmdb_id", plex_guid=plex_guid, imdb_id=imdb_id, tvdb_id=tvdb_id, anidb_id=anidb_id)
return self._id_from(media_type, "tmdb_id", plex_guid=plex_guid, imdb_id=imdb_id, tvdb_id=tvdb_id, anidb_id=anidb_id)
def get_imdb_id(self, media_type, plex_guid=None, tmdb_id=None, tvdb_id=None, anidb_id=None):
return self.get_id_from(media_type, "imdb_id", plex_guid=plex_guid, tmdb_id=tmdb_id, tvdb_id=tvdb_id, anidb_id=anidb_id)
return self._id_from(media_type, "imdb_id", plex_guid=plex_guid, tmdb_id=tmdb_id, tvdb_id=tvdb_id, anidb_id=anidb_id)
def get_tvdb_id(self, media_type, plex_guid=None, tmdb_id=None, imdb_id=None, anidb_id=None):
return self.get_id_from(media_type, "tvdb_id", plex_guid=plex_guid, tmdb_id=tmdb_id, imdb_id=imdb_id, anidb_id=anidb_id)
return self._id_from(media_type, "tvdb_id", plex_guid=plex_guid, tmdb_id=tmdb_id, imdb_id=imdb_id, anidb_id=anidb_id)
def get_anidb_id(self, media_type, plex_guid=None, tmdb_id=None, imdb_id=None, tvdb_id=None):
return self.get_id_from(media_type, "anidb_id", plex_guid=plex_guid, tmdb_id=tmdb_id, imdb_id=imdb_id, tvdb_id=tvdb_id)
return self._id_from(media_type, "anidb_id", plex_guid=plex_guid, tmdb_id=tmdb_id, imdb_id=imdb_id, tvdb_id=tvdb_id)
def get_id_from(self, media_type, id_from, plex_guid=None, tmdb_id=None, imdb_id=None, tvdb_id=None, anidb_id=None):
if plex_guid: return self.get_id(media_type, "plex_guid", id_from, plex_guid)
elif tmdb_id: return self.get_id(media_type, "tmdb_id", id_from, tmdb_id)
elif imdb_id: return self.get_id(media_type, "imdb_id", id_from, imdb_id)
elif tvdb_id: return self.get_id(media_type, "tvdb_id", id_from, tvdb_id)
elif anidb_id: return self.get_id(media_type, "anidb_id", id_from, anidb_id)
def _id_from(self, media_type, id_from, plex_guid=None, tmdb_id=None, imdb_id=None, tvdb_id=None, anidb_id=None):
if plex_guid: return self._id(media_type, "plex_guid", id_from, plex_guid)
elif tmdb_id: return self._id(media_type, "tmdb_id", id_from, tmdb_id)
elif imdb_id: return self._id(media_type, "imdb_id", id_from, imdb_id)
elif tvdb_id: return self._id(media_type, "tvdb_id", id_from, tvdb_id)
elif anidb_id: return self._id(media_type, "anidb_id", id_from, anidb_id)
else: return None, None
def get_id(self, media_type, from_id, to_id, key):
def _id(self, media_type, from_id, to_id, key):
id_to_return = None
expired = None
with sqlite3.connect(self.cache_path) as connection:
@ -160,9 +160,9 @@ class Cache:
cursor.execute("INSERT OR IGNORE INTO imdb_map(imdb_id) VALUES(?)", (imdb_id,))
cursor.execute("UPDATE imdb_map SET t_id = ?, expiration_date = ?, media_type = ? WHERE imdb_id = ?", (tmdb_id if media_type == "movie" else tvdb_id, expiration_date.strftime("%Y-%m-%d"), media_type, imdb_id))
def get_tmdb_from_imdb(self, imdb_id): return self.query_imdb_map("movie", imdb_id)
def get_tvdb_from_imdb(self, imdb_id): return self.query_imdb_map("show", imdb_id)
def query_imdb_map(self, media_type, imdb_id):
def get_tmdb_from_imdb(self, imdb_id): return self._imdb_map("movie", imdb_id)
def get_tvdb_from_imdb(self, imdb_id): return self._imdb_map("show", imdb_id)
def _imdb_map(self, media_type, imdb_id):
id_to_return = None
expired = None
with sqlite3.connect(self.cache_path) as connection:

@ -21,12 +21,12 @@ class IMDbAPI:
imdb_url = imdb_url.strip()
if not imdb_url.startswith(self.urls["list"]) and not imdb_url.startswith(self.urls["search"]) and not imdb_url.startswith(self.urls["keyword"]):
raise Failed(f"IMDb Error: {imdb_url} must begin with either:\n{self.urls['list']} (For Lists)\n{self.urls['search']} (For Searches)\n{self.urls['keyword']} (For Keyword Searches)")
total, _ = self.get_total(self.fix_url(imdb_url), language)
total, _ = self._total(self._fix_url(imdb_url), language)
if total > 0:
return imdb_url
raise Failed(f"IMDb Error: {imdb_url} failed to parse")
def fix_url(self, imdb_url):
def _fix_url(self, imdb_url):
if imdb_url.startswith(self.urls["list"]):
try: list_id = re.search("(\\d+)", str(imdb_url)).group(1)
except AttributeError: raise Failed(f"IMDb Error: Failed to parse List ID from {imdb_url}")
@ -36,10 +36,10 @@ class IMDbAPI:
else:
return imdb_url
def get_total(self, imdb_url, language):
def _total(self, imdb_url, language):
header = {"Accept-Language": language}
if imdb_url.startswith(self.urls["keyword"]):
results = self.send_request(imdb_url, header).xpath("//div[@class='desc']/text()")
results = self._request(imdb_url, header).xpath("//div[@class='desc']/text()")
total = None
for result in results:
if "title" in result:
@ -52,15 +52,15 @@ class IMDbAPI:
raise Failed(f"IMDb Error: No Results at URL: {imdb_url}")
return total, 50
else:
try: results = self.send_request(imdb_url, header).xpath("//div[@class='desc']/span/text()")[0].replace(",", "")
try: results = self._request(imdb_url, header).xpath("//div[@class='desc']/span/text()")[0].replace(",", "")
except IndexError: raise Failed(f"IMDb Error: Failed to parse URL: {imdb_url}")
try: total = int(re.findall("(\\d+) title", results)[0])
except IndexError: raise Failed(f"IMDb Error: No Results at URL: {imdb_url}")
return total, 250
def get_imdb_ids_from_url(self, imdb_url, language, limit):
current_url = self.fix_url(imdb_url)
total, item_count = self.get_total(current_url, language)
def _ids_from_url(self, imdb_url, language, limit):
current_url = self._fix_url(imdb_url)
total, item_count = self._total(current_url, language)
header = {"Accept-Language": language}
length = 0
imdb_ids = []
@ -76,9 +76,9 @@ class IMDbAPI:
start_num = (i - 1) * item_count + 1
length = util.print_return(length, f"Parsing Page {i}/{num_of_pages} {start_num}-{limit if i == num_of_pages else i * item_count}")
if imdb_url.startswith(self.urls["keyword"]):
response = self.send_request(f"{current_url}&page={i}", header)
response = self._request(f"{current_url}&page={i}", header)
else:
response = self.send_request(f"{current_url}&count={remainder if i == num_of_pages else item_count}&start={start_num}", header)
response = self._request(f"{current_url}&count={remainder if i == num_of_pages else item_count}&start={start_num}", header)
if imdb_url.startswith(self.urls["keyword"]) and i == num_of_pages:
imdb_ids.extend(response.xpath("//div[contains(@class, 'lister-item-image')]//a/img//@data-tconst")[:remainder])
else:
@ -88,7 +88,7 @@ class IMDbAPI:
else: raise Failed(f"IMDb Error: No IMDb IDs Found at {imdb_url}")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url, header):
def _request(self, url, header):
return html.fromstring(requests.get(url, headers=header).content)
def get_items(self, method, data, language, status_message=True):
@ -107,7 +107,7 @@ class IMDbAPI:
if status_message:
status = f"{data['limit']} Items at " if data['limit'] > 0 else ''
logger.info(f"Processing {pretty}: {status}{data['url']}")
imdb_ids = self.get_imdb_ids_from_url(data["url"], language, data["limit"])
imdb_ids = self._ids_from_url(data["url"], language, data["limit"])
total_ids = len(imdb_ids)
length = 0
for i, imdb_id in enumerate(imdb_ids, 1):

@ -14,15 +14,11 @@ class LetterboxdAPI:
self.url = "https://letterboxd.com"
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url, language):
def _request(self, url, language):
return html.fromstring(requests.get(url, headers={"Accept-Language": language, "User-Agent": "Mozilla/5.0 x64"}).content)
def get_list_description(self, list_url, language):
descriptions = self.send_request(list_url, language).xpath("//meta[@property='og:description']/@content")
return descriptions[0] if len(descriptions) > 0 and len(descriptions[0]) > 0 else None
def parse_list(self, list_url, language):
response = self.send_request(list_url, language)
def _parse_list(self, list_url, language):
response = self._request(list_url, language)
letterboxd_ids = response.xpath("//div[@class='poster film-poster really-lazy-load']/@data-film-id")
items = []
for letterboxd_id in letterboxd_ids:
@ -30,14 +26,11 @@ class LetterboxdAPI:
items.append((letterboxd_id, slugs[0]))
next_url = response.xpath("//a[@class='next']/@href")
if len(next_url) > 0:
items.extend(self.parse_list(f"{self.url}{next_url[0]}", language))
items.extend(self._parse_list(f"{self.url}{next_url[0]}", language))
return items
def get_tmdb_from_slug(self, slug, language):
return self.get_tmdb(f"{self.url}{slug}", language)
def get_tmdb(self, letterboxd_url, language):
response = self.send_request(letterboxd_url, language)
def _tmdb(self, letterboxd_url, language):
response = self._request(letterboxd_url, language)
ids = response.xpath("//a[@data-track-action='TMDb']/@href")
if len(ids) > 0 and ids[0]:
if "themoviedb.org/movie" in ids[0]:
@ -45,12 +38,16 @@ class LetterboxdAPI:
raise Failed(f"Letterboxd Error: TMDb Movie ID not found in {ids[0]}")
raise Failed(f"Letterboxd Error: TMDb Movie ID not found at {letterboxd_url}")
def get_list_description(self, list_url, language):
descriptions = self._request(list_url, language).xpath("//meta[@property='og:description']/@content")
return descriptions[0] if len(descriptions) > 0 and len(descriptions[0]) > 0 else None
def get_items(self, method, data, language, status_message=True):
pretty = util.pretty_names[method] if method in util.pretty_names else method
movie_ids = []
if status_message:
logger.info(f"Processing {pretty}: {data}")
items = self.parse_list(data, language)
items = self._parse_list(data, language)
total_items = len(items)
if total_items == 0:
raise Failed(f"Letterboxd Error: No List Items found in {data}")
@ -63,7 +60,7 @@ class LetterboxdAPI:
tmdb_id, expired = self.config.Cache.query_letterboxd_map(item[0])
if not tmdb_id or expired is not False:
try:
tmdb_id = self.get_tmdb_from_slug(item[1], language)
tmdb_id = self._tmdb(f"{self.url}{item[1]}", language)
except Failed as e:
logger.error(e)
continue

@ -87,11 +87,11 @@ class MyAnimeListAPI:
self.client_secret = params["client_secret"]
self.config_path = params["config_path"]
self.authorization = authorization
if not self.save_authorization(self.authorization):
if not self.refresh_authorization():
self.get_authorization()
if not self._save(self.authorization):
if not self._refresh():
self._authorization()
def get_authorization(self):
def _authorization(self):
code_verifier = secrets.token_urlsafe(100)[:128]
url = f"{self.urls['oauth_authorize']}?response_type=code&client_id={self.client_id}&code_challenge={code_verifier}"
logger.info("")
@ -114,21 +114,21 @@ class MyAnimeListAPI:
"code_verifier": code_verifier,
"grant_type": "authorization_code"
}
new_authorization = self.oauth_request(data)
new_authorization = self._oauth(data)
if "error" in new_authorization:
raise Failed("MyAnimeList Error: Invalid code")
if not self.save_authorization(new_authorization):
if not self._save(new_authorization):
raise Failed("MyAnimeList Error: New Authorization Failed")
def check_authorization(self, authorization):
def _check(self, authorization):
try:
self.send_request(self.urls["suggestions"], authorization=authorization)
self._request(self.urls["suggestions"], authorization=authorization)
return True
except Failed as e:
logger.debug(e)
return False
def refresh_authorization(self):
def _refresh(self):
if self.authorization and "refresh_token" in self.authorization and self.authorization["refresh_token"]:
logger.info("Refreshing Access Token...")
data = {
@ -137,12 +137,12 @@ class MyAnimeListAPI:
"refresh_token": self.authorization["refresh_token"],
"grant_type": "refresh_token"
}
refreshed_authorization = self.oauth_request(data)
return self.save_authorization(refreshed_authorization)
refreshed_authorization = self._oauth(data)
return self._save(refreshed_authorization)
return False
def save_authorization(self, authorization):
if authorization is not None and "access_token" in authorization and authorization["access_token"] and self.check_authorization(authorization):
def _save(self, authorization):
if authorization is not None and "access_token" in authorization and authorization["access_token"] and self._check(authorization):
if self.authorization != authorization:
yaml.YAML().allow_duplicate_keys = True
config, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.config_path))
@ -159,39 +159,39 @@ class MyAnimeListAPI:
return False
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def oauth_request(self, data):
def _oauth(self, data):
return requests.post(self.urls["oauth_token"], data).json()
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def send_request(self, url, authorization=None):
def _request(self, url, authorization=None):
new_authorization = authorization if authorization else self.authorization
response = requests.get(url, headers={"Authorization": f"Bearer {new_authorization['access_token']}"}).json()
if "error" in response: raise Failed(f"MyAnimeList Error: {response['error']}")
else: return response
def request_and_parse_mal_ids(self, url):
data = self.send_request(url)
def _parse_request(self, url):
data = self._request(url)
return [d["node"]["id"] for d in data["data"]] if "data" in data else []
def get_username(self):
return self.send_request(f"{self.urls['user']}/@me")["name"]
def _username(self):
return self._request(f"{self.urls['user']}/@me")["name"]
def get_ranked(self, ranking_type, limit):
def _ranked(self, ranking_type, limit):
url = f"{self.urls['ranking']}?ranking_type={ranking_type}&limit={limit}"
return self.request_and_parse_mal_ids(url)
return self._parse_request(url)
def get_season(self, season, year, sort_by, limit):
def _season(self, season, year, sort_by, limit):
url = f"{self.urls['season']}/{year}/{season}?sort={sort_by}&limit={limit}"
return self.request_and_parse_mal_ids(url)
return self._parse_request(url)
def get_suggestions(self, limit):
def _suggestions(self, limit):
url = f"{self.urls['suggestions']}?limit={limit}"
return self.request_and_parse_mal_ids(url)
return self._parse_request(url)
def get_userlist(self, username, status, sort_by, limit):
def _userlist(self, username, status, sort_by, limit):
final_status = "" if status == "all" else f"status={status}&"
url = f"{self.urls['user']}/{username}/animelist?{final_status}sort={sort_by}&limit={limit}"
return self.request_and_parse_mal_ids(url)
return self._parse_request(url)
def get_items(self, method, data, language, status_message=True):
if status_message:
@ -202,21 +202,21 @@ class MyAnimeListAPI:
if status_message:
logger.info(f"Processing {pretty}: {data}")
elif method in mal_ranked_name:
mal_ids = self.get_ranked(mal_ranked_name[method], data)
mal_ids = self._ranked(mal_ranked_name[method], data)
if status_message:
logger.info(f"Processing {pretty}: {data} Anime")
elif method == "mal_season":
mal_ids = self.get_season(data["season"], data["year"], data["sort_by"], data["limit"])
mal_ids = self._season(data["season"], data["year"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit']} Anime from {util.pretty_seasons[data['season']]} {data['year']} sorted by {pretty_names[data['sort_by']]}")
elif method == "mal_suggested":
mal_ids = self.get_suggestions(data)
mal_ids = self._suggestions(data)
if status_message:
logger.info(f"Processing {pretty}: {data} Anime")
elif method == "mal_userlist":
mal_ids = self.get_userlist(data["username"], data["status"], data["sort_by"], data["limit"])
mal_ids = self._userlist(data["username"], data["status"], data["sort_by"], data["limit"])
if status_message:
logger.info(f"Processing {pretty}: {data['limit']} Anime from {self.get_username() if data['username'] == '@me' else data['username']}'s {pretty_names[data['status']]} list sorted by {pretty_names[data['sort_by']]}")
logger.info(f"Processing {pretty}: {data['limit']} Anime from {self._username() if data['username'] == '@me' else data['username']}'s {pretty_names[data['status']]} list sorted by {pretty_names[data['sort_by']]}")
else:
raise Failed(f"MyAnimeList Error: Method {method} not supported")
movie_ids, show_ids = self.config.Arms.myanimelist_to_ids(mal_ids, language)

@ -38,7 +38,7 @@ class RadarrAPI:
def get_profile_id(self, profile_name):
profiles = ""
for profile in self.send_get("qualityProfile" if self.version == "v3" else "profile"):
for profile in self._get("qualityProfile" if self.version == "v3" else "profile"):
if len(profiles) > 0:
profiles += ", "
profiles += profile["name"]
@ -47,19 +47,19 @@ class RadarrAPI:
raise Failed(f"Radarr Error: quality_profile: {profile_name} does not exist in radarr. Profiles available: {profiles}")
def get_tags(self):
return {tag["label"]: tag["id"] for tag in self.send_get("tag")}
return {tag["label"]: tag["id"] for tag in self._get("tag")}
def add_tags(self, tags):
added = False
for label in tags:
if str(label).lower() not in self.tags:
added = True
self.send_post("tag", {"label": str(label).lower()})
self._post("tag", {"label": str(label).lower()})
if added:
self.tags = self.get_tags()
def lookup(self, tmdb_id):
results = self.send_get("movie/lookup", params={"term": f"tmdb:{tmdb_id}"})
results = self._get("movie/lookup", params={"term": f"tmdb:{tmdb_id}"})
if results:
return results[0]
else:
@ -105,7 +105,7 @@ class RadarrAPI:
}
if tag_nums:
url_json["tags"] = tag_nums
response = self.send_post("movie", url_json)
response = self._post("movie", url_json)
if response.status_code < 400:
logger.info(f"Added to Radarr | {tmdb_id:<6} | {movie_info['title']}")
add_count += 1
@ -118,7 +118,7 @@ class RadarrAPI:
logger.info(f"{add_count} Movie{'s' if add_count > 1 else ''} added to Radarr")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_get(self, url, params=None):
def _get(self, url, params=None):
url_params = {"apikey": f"{self.token}"}
if params:
for param in params:
@ -126,5 +126,5 @@ class RadarrAPI:
return requests.get(f"{self.base_url}{url}", params=url_params).json()
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_post(self, url, url_json):
def _post(self, url, url_json):
return requests.post(f"{self.base_url}{url}", json=url_json, params={"apikey": f"{self.token}"})

@ -58,7 +58,7 @@ class SonarrAPI:
endpoint = "languageProfile"
else:
endpoint = "profile"
for profile in self.send_get(endpoint):
for profile in self._get(endpoint):
if len(profiles) > 0:
profiles += ", "
profiles += profile["name"]
@ -67,19 +67,19 @@ class SonarrAPI:
raise Failed(f"Sonarr Error: {profile_type}: {profile_name} does not exist in sonarr. Profiles available: {profiles}")
def get_tags(self):
return {tag["label"]: tag["id"] for tag in self.send_get("tag")}
return {tag["label"]: tag["id"] for tag in self._get("tag")}
def add_tags(self, tags):
added = False
for label in tags:
if str(label).lower() not in self.tags:
added = True
self.send_post("tag", {"label": str(label).lower()})
self._post("tag", {"label": str(label).lower()})
if added:
self.tags = self.get_tags()
def lookup(self, tvdb_id):
results = self.send_get("series/lookup", params={"term": f"tvdb:{tvdb_id}"})
results = self._get("series/lookup", params={"term": f"tvdb:{tvdb_id}"})
if results:
return results[0]
else:
@ -135,7 +135,7 @@ class SonarrAPI:
}
if tag_nums:
url_json["tags"] = tag_nums
response = self.send_post("series", url_json)
response = self._post("series", url_json)
if response.status_code < 400:
logger.info(f"Added to Sonarr | {tvdb_id:<6} | {show_info['title']}")
add_count += 1
@ -152,7 +152,7 @@ class SonarrAPI:
logger.info(f"{add_count} Show{'s' if add_count > 1 else ''} added to Sonarr")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_get(self, url, params=None):
def _get(self, url, params=None):
url_params = {"apikey": f"{self.token}"}
if params:
for param in params:
@ -160,5 +160,5 @@ class SonarrAPI:
return requests.get(f"{self.base_url}{url}", params=url_params).json()
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_post(self, url, url_json):
def _post(self, url, url_json):
return requests.post(f"{self.base_url}{url}", json=url_json, params={"apikey": f"{self.token}"})

@ -19,16 +19,10 @@ class TautulliAPI:
self.url = params["url"]
self.apikey = params["apikey"]
def get_popular(self, library, time_range=30, stats_count=20, stats_count_buffer=20, status_message=True):
return self.get_items(library, time_range=time_range, stats_count=stats_count, list_type="popular", stats_count_buffer=stats_count_buffer, status_message=status_message)
def get_top(self, library, time_range=30, stats_count=20, stats_count_buffer=20, status_message=True):
return self.get_items(library, time_range=time_range, stats_count=stats_count, list_type="top", stats_count_buffer=stats_count_buffer, status_message=status_message)
def get_items(self, library, time_range=30, stats_count=20, list_type="popular", stats_count_buffer=20, status_message=True):
if status_message:
logger.info(f"Processing Tautulli Most {'Popular' if list_type == 'popular' else 'Watched'}: {stats_count} {'Movies' if library.is_movie else 'Shows'}")
response = self.send_request(f"{self.url}/api/v2?apikey={self.apikey}&cmd=get_home_stats&time_range={time_range}&stats_count={int(stats_count) + int(stats_count_buffer)}")
response = self._request(f"{self.url}/api/v2?apikey={self.apikey}&cmd=get_home_stats&time_range={time_range}&stats_count={int(stats_count) + int(stats_count_buffer)}")
stat_id = f"{'popular' if list_type == 'popular' else 'top'}_{'movies' if library.is_movie else 'tv'}"
items = None
@ -39,7 +33,7 @@ class TautulliAPI:
if items is None:
raise Failed("Tautulli Error: No Items found in the response")
section_id = self.get_section_id(library.name)
section_id = self._section_id(library.name)
rating_keys = []
count = 0
for item in items:
@ -48,8 +42,8 @@ class TautulliAPI:
count += 1
return rating_keys
def get_section_id(self, library_name):
response = self.send_request(f"{self.url}/api/v2?apikey={self.apikey}&cmd=get_library_names")
def _section_id(self, library_name):
response = self._request(f"{self.url}/api/v2?apikey={self.apikey}&cmd=get_library_names")
section_id = None
for entry in response["response"]["data"]:
if entry["section_name"] == library_name:
@ -59,6 +53,6 @@ class TautulliAPI:
else: raise Failed(f"Tautulli Error: No Library named {library_name} in the response")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url):
def _request(self, url):
logger.debug(f"Tautulli URL: {url.replace(self.apikey, '################################')}")
return requests.get(url).json()

@ -131,7 +131,7 @@ class TMDbAPI:
self.image_url = "https://image.tmdb.org/t/p/original"
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def convert_from_tmdb(self, tmdb_id, convert_to, is_movie):
def _from_tmdb(self, tmdb_id, convert_to, is_movie):
try:
id_to_return = self.Movie.external_ids(tmdb_id)[convert_to] if is_movie else self.TV.external_ids(tmdb_id)[convert_to]
if not id_to_return or (convert_to == "tvdb_id" and id_to_return == 0):
@ -141,16 +141,16 @@ class TMDbAPI:
raise Failed(f"TMDb Error: {'Movie' if is_movie else 'Show'} TMDb ID: {tmdb_id} not found")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def convert_to_tmdb(self, external_id, external_source, is_movie):
def _to_tmdb(self, external_id, external_source, is_movie):
search_results = self.Movie.external(external_id=external_id, external_source=external_source)
search = search_results["movie_results" if is_movie else "tv_results"]
if len(search) == 1: return int(search[0]["id"])
else: raise Failed(f"TMDb Error: No TMDb ID found for {external_source.upper().replace('B_', 'b ')} {external_id}")
def convert_tmdb_to_imdb(self, tmdb_id, is_movie=True): return self.convert_from_tmdb(tmdb_id, "imdb_id", is_movie)
def convert_imdb_to_tmdb(self, imdb_id, is_movie=True): return self.convert_to_tmdb(imdb_id, "imdb_id", is_movie)
def convert_tmdb_to_tvdb(self, tmdb_id): return self.convert_from_tmdb(tmdb_id, "tvdb_id", False)
def convert_tvdb_to_tmdb(self, tvdb_id): return self.convert_to_tmdb(tvdb_id, "tvdb_id", False)
def convert_tmdb_to_imdb(self, tmdb_id, is_movie=True): return self._from_tmdb(tmdb_id, "imdb_id", is_movie)
def convert_imdb_to_tmdb(self, imdb_id, is_movie=True): return self._to_tmdb(imdb_id, "imdb_id", is_movie)
def convert_tmdb_to_tvdb(self, tmdb_id): return self._from_tmdb(tmdb_id, "tvdb_id", False)
def convert_tvdb_to_tmdb(self, tvdb_id): return self._to_tmdb(tvdb_id, "tvdb_id", False)
def convert_tvdb_to_imdb(self, tvdb_id): return self.convert_tmdb_to_imdb(self.convert_tvdb_to_tmdb(tvdb_id), False)
def convert_imdb_to_tvdb(self, imdb_id): return self.convert_tmdb_to_tvdb(self.convert_imdb_to_tmdb(imdb_id, False))
@ -183,22 +183,22 @@ class TMDbAPI:
except TMDbException as e: raise Failed(f"TMDb Error: No Person found for TMDb ID {tmdb_id}: {e}")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_person_credits(self, tmdb_id):
def _person_credits(self, tmdb_id):
try: return self.Person.combined_credits(tmdb_id)
except TMDbException as e: raise Failed(f"TMDb Error: No Person found for TMDb ID {tmdb_id}: {e}")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_company(self, tmdb_id):
def _company(self, tmdb_id):
try: return self.Company.details(tmdb_id)
except TMDbException as e: raise Failed(f"TMDb Error: No Company found for TMDb ID {tmdb_id}: {e}")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_network(self, tmdb_id):
def _network(self, tmdb_id):
try: return self.Network.details(tmdb_id)
except TMDbException as e: raise Failed(f"TMDb Error: No Network found for TMDb ID {tmdb_id}: {e}")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def get_keyword(self, tmdb_id):
def _keyword(self, tmdb_id):
try: return self.Keyword.details(tmdb_id)
except TMDbException as e: raise Failed(f"TMDb Error: No Keyword found for TMDb ID {tmdb_id}: {e}")
@ -207,10 +207,10 @@ class TMDbAPI:
try: return self.List.details(tmdb_id, all_details=True)
except TMDbException as e: raise Failed(f"TMDb Error: No List found for TMDb ID {tmdb_id}: {e}")
def get_credits(self, tmdb_id, actor=False, crew=False, director=False, producer=False, writer=False):
def _credits(self, tmdb_id, actor=False, crew=False, director=False, producer=False, writer=False):
movie_ids = []
show_ids = []
actor_credits = self.get_person_credits(tmdb_id)
actor_credits = self._person_credits(tmdb_id)
if actor:
for credit in actor_credits.cast:
if credit.media_type == "movie":
@ -234,7 +234,7 @@ class TMDbAPI:
logger.warning(e)
return movie_ids, show_ids
def get_pagenation(self, method, amount, is_movie):
def _pagenation(self, method, amount, is_movie):
ids = []
count = 0
for x in range(int(amount / 20) + 1):
@ -254,7 +254,7 @@ class TMDbAPI:
if count == amount: break
return ids
def get_discover(self, attrs, amount, is_movie):
def _discover(self, attrs, amount, is_movie):
ids = []
count = 0
for date_attr in discover_dates:
@ -278,6 +278,24 @@ class TMDbAPI:
if count == amount: break
return ids, amount
def validate_tmdb_list(self, tmdb_list, tmdb_type):
tmdb_values = []
for tmdb_id in tmdb_list:
try: tmdb_values.append(self.validate_tmdb(tmdb_id, tmdb_type))
except Failed as e: logger.error(e)
if len(tmdb_values) == 0: raise Failed(f"TMDb Error: No valid TMDb IDs in {tmdb_list}")
return tmdb_values
def validate_tmdb(self, tmdb_id, tmdb_type):
if tmdb_type == "Movie": self.get_movie(tmdb_id)
elif tmdb_type == "Show": self.get_show(tmdb_id)
elif tmdb_type == "Collection": self.get_collection(tmdb_id)
elif tmdb_type == "Person": self.get_person(tmdb_id)
elif tmdb_type == "Company": self._company(tmdb_id)
elif tmdb_type == "Network": self._network(tmdb_id)
elif tmdb_type == "List": self.get_list(tmdb_id)
return tmdb_id
def get_items(self, method, data, is_movie, status_message=True):
if status_message:
logger.debug(f"Data: {data}")
@ -292,20 +310,20 @@ class TMDbAPI:
if method in ["tmdb_company", "tmdb_network", "tmdb_keyword"]:
tmdb_id = int(data)
if method == "tmdb_company":
tmdb_name = str(self.get_company(tmdb_id).name)
tmdb_name = str(self._company(tmdb_id).name)
attrs = {"with_companies": tmdb_id}
elif method == "tmdb_network":
tmdb_name = str(self.get_network(tmdb_id).name)
tmdb_name = str(self._network(tmdb_id).name)
attrs = {"with_networks": tmdb_id}
elif method == "tmdb_keyword":
tmdb_name = str(self.get_keyword(tmdb_id).name)
tmdb_name = str(self._keyword(tmdb_id).name)
attrs = {"with_keywords": tmdb_id}
limit = 0
else:
attrs = data.copy()
limit = int(attrs.pop("limit"))
if is_movie: movie_ids, amount = self.get_discover(attrs, limit, is_movie)
else: show_ids, amount = self.get_discover(attrs, limit, is_movie)
if is_movie: movie_ids, amount = self._discover(attrs, limit, is_movie)
else: show_ids, amount = self._discover(attrs, limit, is_movie)
if status_message:
if method in ["tmdb_company", "tmdb_network", "tmdb_keyword"]:
logger.info(f"Processing {pretty}: ({tmdb_id}) {tmdb_name} ({amount} {media_type}{'' if amount == 1 else 's'})")
@ -314,8 +332,8 @@ class TMDbAPI:
for attr, value in attrs.items():
logger.info(f" {attr}: {value}")
elif method in ["tmdb_popular", "tmdb_top_rated", "tmdb_now_playing", "tmdb_trending_daily", "tmdb_trending_weekly"]:
if is_movie: movie_ids = self.get_pagenation(method, data, is_movie)
else: show_ids = self.get_pagenation(method, data, is_movie)
if is_movie: movie_ids = self._pagenation(method, data, is_movie)
else: show_ids = self._pagenation(method, data, is_movie)
if status_message:
logger.info(f"Processing {pretty}: {data} {media_type}{'' if data == 1 else 's'}")
else:
@ -342,11 +360,11 @@ class TMDbAPI:
show_ids.append(self.convert_tmdb_to_tvdb(tmdb_id))
else:
tmdb_name = str(self.get_person(tmdb_id).name)
if method == "tmdb_actor": movie_ids, show_ids = self.get_credits(tmdb_id, actor=True)
elif method == "tmdb_director": movie_ids, show_ids = self.get_credits(tmdb_id, director=True)
elif method == "tmdb_producer": movie_ids, show_ids = self.get_credits(tmdb_id, producer=True)
elif method == "tmdb_writer": movie_ids, show_ids = self.get_credits(tmdb_id, writer=True)
elif method == "tmdb_crew": movie_ids, show_ids = self.get_credits(tmdb_id, crew=True)
if method == "tmdb_actor": movie_ids, show_ids = self._credits(tmdb_id, actor=True)
elif method == "tmdb_director": movie_ids, show_ids = self._credits(tmdb_id, director=True)
elif method == "tmdb_producer": movie_ids, show_ids = self._credits(tmdb_id, producer=True)
elif method == "tmdb_writer": movie_ids, show_ids = self._credits(tmdb_id, writer=True)
elif method == "tmdb_crew": movie_ids, show_ids = self._credits(tmdb_id, crew=True)
else: raise Failed(f"TMDb Error: Method {method} not supported")
if status_message and len(movie_ids) > 0:
logger.info(f"Processing {pretty}: ({tmdb_id}) {tmdb_name} ({len(movie_ids)} Movie{'' if len(movie_ids) == 1 else 's'})")
@ -356,21 +374,3 @@ class TMDbAPI:
logger.debug(f"TMDb IDs Found: {movie_ids}")
logger.debug(f"TVDb IDs Found: {show_ids}")
return movie_ids, show_ids
def validate_tmdb_list(self, tmdb_list, tmdb_type):
tmdb_values = []
for tmdb_id in tmdb_list:
try: tmdb_values.append(self.validate_tmdb(tmdb_id, tmdb_type))
except Failed as e: logger.error(e)
if len(tmdb_values) == 0: raise Failed(f"TMDb Error: No valid TMDb IDs in {tmdb_list}")
return tmdb_values
def validate_tmdb(self, tmdb_id, tmdb_type):
if tmdb_type == "Movie": self.get_movie(tmdb_id)
elif tmdb_type == "Show": self.get_show(tmdb_id)
elif tmdb_type == "Collection": self.get_collection(tmdb_id)
elif tmdb_type == "Person": self.get_person(tmdb_id)
elif tmdb_type == "Company": self.get_company(tmdb_id)
elif tmdb_type == "Network": self.get_network(tmdb_id)
elif tmdb_type == "List": self.get_list(tmdb_id)
return tmdb_id

@ -37,11 +37,11 @@ class TraktAPI:
self.config_path = params["config_path"]
self.authorization = authorization
Trakt.configuration.defaults.client(self.client_id, self.client_secret)
if not self.save_authorization(self.authorization):
if not self.refresh_authorization():
self.get_authorization()
if not self._save(self.authorization):
if not self._refresh():
self._authorization()
def get_authorization(self):
def _authorization(self):
url = Trakt["oauth"].authorize_url(self.redirect_uri)
logger.info(f"Navigate to: {url}")
logger.info("If you get an OAuth error your client_id or client_secret is invalid")
@ -52,10 +52,10 @@ class TraktAPI:
new_authorization = Trakt["oauth"].token(pin, self.redirect_uri)
if not new_authorization:
raise Failed("Trakt Error: Invalid trakt pin. If you're sure you typed it in correctly your client_id or client_secret may be invalid")
if not self.save_authorization(new_authorization):
if not self._save(new_authorization):
raise Failed("Trakt Error: New Authorization Failed")
def check_authorization(self, authorization):
def _check(self, authorization):
try:
with Trakt.configuration.oauth.from_response(authorization, refresh=True):
if Trakt["users/settings"].get():
@ -63,15 +63,15 @@ class TraktAPI:
except ValueError: pass
return False
def refresh_authorization(self):
def _refresh(self):
if self.authorization and "refresh_token" in self.authorization and self.authorization["refresh_token"]:
logger.info("Refreshing Access Token...")
refreshed_authorization = Trakt["oauth"].token_refresh(self.authorization["refresh_token"], self.redirect_uri)
return self.save_authorization(refreshed_authorization)
return self._save(refreshed_authorization)
return False
def save_authorization(self, authorization):
if authorization and self.check_authorization(authorization):
def _save(self, authorization):
if authorization and self._check(authorization):
if self.authorization != authorization:
yaml.YAML().allow_duplicate_keys = True
config, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.config_path))
@ -90,15 +90,15 @@ class TraktAPI:
return True
return False
def convert_tmdb_to_imdb(self, tmdb_id, is_movie=True): return self.convert_id(tmdb_id, "tmdb", "imdb", "movie" if is_movie else "show")
def convert_imdb_to_tmdb(self, imdb_id, is_movie=True): return self.convert_id(imdb_id, "imdb", "tmdb", "movie" if is_movie else "show")
def convert_tmdb_to_tvdb(self, tmdb_id): return self.convert_id(tmdb_id, "tmdb", "tvdb", "show")
def convert_tvdb_to_tmdb(self, tvdb_id): return self.convert_id(tvdb_id, "tvdb", "tmdb", "show")
def convert_tvdb_to_imdb(self, tvdb_id): return self.convert_id(tvdb_id, "tvdb", "imdb", "show")
def convert_imdb_to_tvdb(self, imdb_id): return self.convert_id(imdb_id, "imdb", "tvdb", "show")
def convert_tmdb_to_imdb(self, tmdb_id, is_movie=True): return self._convert(tmdb_id, "tmdb", "imdb", "movie" if is_movie else "show")
def convert_imdb_to_tmdb(self, imdb_id, is_movie=True): return self._convert(imdb_id, "imdb", "tmdb", "movie" if is_movie else "show")
def convert_tmdb_to_tvdb(self, tmdb_id): return self._convert(tmdb_id, "tmdb", "tvdb", "show")
def convert_tvdb_to_tmdb(self, tvdb_id): return self._convert(tvdb_id, "tvdb", "tmdb", "show")
def convert_tvdb_to_imdb(self, tvdb_id): return self._convert(tvdb_id, "tvdb", "imdb", "show")
def convert_imdb_to_tvdb(self, imdb_id): return self._convert(imdb_id, "imdb", "tvdb", "show")
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def convert_id(self, external_id, from_source, to_source, media_type):
def _convert(self, external_id, from_source, to_source, media_type):
lookup = Trakt["search"].lookup(external_id, from_source, media_type)
if lookup:
lookup = lookup[0] if isinstance(lookup, list) else lookup
@ -107,13 +107,13 @@ class TraktAPI:
raise Failed(f"No {to_source.upper().replace('B', 'b')} ID found for {from_source.upper().replace('B', 'b')} ID {external_id}")
def collection(self, data, is_movie):
return self.user_list("collection", data, is_movie)
return self._user_list("collection", data, is_movie)
def watchlist(self, data, is_movie):
return self.user_list("watchlist", data, is_movie)
def _watchlist(self, data, is_movie):
return self._user_list("watchlist", data, is_movie)
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
def user_list(self, list_type, data, is_movie):
def _user_list(self, list_type, data, is_movie):
items = Trakt[f"users/{data}/{list_type}"].movies() if is_movie else Trakt[f"users/{data}/{list_type}"].shows()
if items is None: raise Failed("Trakt Error: No List found")
else: return [i for i in items]
@ -126,16 +126,16 @@ class TraktAPI:
else: return trakt_list
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url):
def _request(self, url):
return requests.get(url, headers={"Content-Type": "application/json", "trakt-api-version": "2", "trakt-api-key": self.client_id}).json()
def get_collection(self, username, is_movie):
items = self.send_request(f"{self.base_url}/users/{username}/collection/{'movies' if is_movie else 'shows'}")
def _collection(self, username, is_movie):
items = self._request(f"{self.base_url}/users/{username}/collection/{'movies' if is_movie else 'shows'}")
if is_movie: return [item["movie"]["ids"]["tmdb"] for item in items], []
else: return [], [item["show"]["ids"]["tvdb"] for item in items]
def get_pagenation(self, pagenation, amount, is_movie):
items = self.send_request(f"{self.base_url}/{'movies' if is_movie else 'shows'}/{pagenation}?limit={amount}")
def _pagenation(self, pagenation, amount, is_movie):
items = self._request(f"{self.base_url}/{'movies' if is_movie else 'shows'}/{pagenation}?limit={amount}")
if pagenation == "popular" and is_movie: return [item["ids"]["tmdb"] for item in items], []
elif pagenation == "popular": return [], [item["ids"]["tvdb"] for item in items]
elif is_movie: return [item["movie"]["ids"]["tmdb"] for item in items], []
@ -146,9 +146,9 @@ class TraktAPI:
for value in values:
try:
if trakt_type == "watchlist" and is_movie is not None:
self.watchlist(value, is_movie)
self._watchlist(value, is_movie)
elif trakt_type == "collection" and is_movie is not None:
self.get_collection(value, is_movie)
self._collection(value, is_movie)
else:
self.standard_list(value)
trakt_values.append(value)
@ -169,24 +169,27 @@ class TraktAPI:
pretty = self.aliases[method] if method in self.aliases else method
media_type = "Movie" if is_movie else "Show"
if method in ["trakt_trending", "trakt_popular", "trakt_recommended", "trakt_watched", "trakt_collected"]:
movie_ids, show_ids = self.get_pagenation(method[6:], data, is_movie)
movie_ids, show_ids = self._pagenation(method[6:], data, is_movie)
if status_message:
logger.info(f"Processing {pretty}: {data} {media_type}{'' if data == 1 else 's'}")
elif method == "trakt_collection":
movie_ids, show_ids = self.get_collection(data, is_movie)
movie_ids, show_ids = self._collection(data, is_movie)
if status_message:
logger.info(f"Processing {pretty} {media_type}s for {data}")
else:
show_ids = []
movie_ids = []
if method == "trakt_watchlist": trakt_items = self.watchlist(data, is_movie)
if method == "trakt_watchlist": trakt_items = self._watchlist(data, is_movie)
elif method == "trakt_list": trakt_items = self.standard_list(data).items()
else: raise Failed(f"Trakt Error: Method {method} not supported")
if status_message: logger.info(f"Processing {pretty}: {data}")
for trakt_item in trakt_items:
if isinstance(trakt_item, Movie): movie_ids.append(int(trakt_item.get_key("tmdb")))
elif isinstance(trakt_item, Show) and trakt_item.pk[1] not in show_ids: show_ids.append(int(trakt_item.pk[1]))
elif (isinstance(trakt_item, (Season, Episode))) and trakt_item.show.pk[1] not in show_ids: show_ids.append(int(trakt_item.show.pk[1]))
if isinstance(trakt_item, Movie):
movie_ids.append(int(trakt_item.get_key("tmdb")))
elif isinstance(trakt_item, Show) and trakt_item.pk[1] not in show_ids:
show_ids.append(int(trakt_item.pk[1]))
elif (isinstance(trakt_item, (Season, Episode))) and trakt_item.show.pk[1] not in show_ids:
show_ids.append(int(trakt_item.show.pk[1]))
if status_message:
logger.debug(f"Trakt {media_type} Found: {trakt_items}")
if status_message:

@ -25,7 +25,7 @@ class TVDbObj:
else:
raise Failed(f"TVDb Error: {tvdb_url} must begin with {TVDb.movies_url if is_movie else TVDb.series_url}")
response = TVDb.send_request(tvdb_url, language)
response = TVDb._request(tvdb_url, language)
results = response.xpath(f"//*[text()='TheTVDB.com {self.media_type} ID']/parent::node()/span/text()")
if len(results) > 0:
self.id = int(results[0])
@ -104,16 +104,16 @@ class TVDbAPI:
return TVDbObj(tvdb_url, language, True, self)
def get_list_description(self, tvdb_url, language):
description = self.send_request(tvdb_url, language).xpath("//div[@class='block']/div[not(@style='display:none')]/p/text()")
description = self._request(tvdb_url, language).xpath("//div[@class='block']/div[not(@style='display:none')]/p/text()")
return description[0] if len(description) > 0 and len(description[0]) > 0 else ""
def get_tvdb_ids_from_url(self, tvdb_url, language):
def _ids_from_url(self, tvdb_url, language):
show_ids = []
movie_ids = []
tvdb_url = tvdb_url.strip()
if tvdb_url.startswith((self.list_url, self.alt_list_url)):
try:
items = self.send_request(tvdb_url, language).xpath("//div[@class='col-xs-12 col-sm-12 col-md-8 col-lg-8 col-md-pull-4']/div[@class='row']")
items = self._request(tvdb_url, language).xpath("//div[@class='col-xs-12 col-sm-12 col-md-8 col-lg-8 col-md-pull-4']/div[@class='row']")
for item in items:
title = item.xpath(".//div[@class='col-xs-12 col-sm-9 mt-2']//a/text()")[0]
item_url = item.xpath(".//div[@class='col-xs-12 col-sm-9 mt-2']//a/@href")[0]
@ -143,7 +143,7 @@ class TVDbAPI:
raise Failed(f"TVDb Error: {tvdb_url} must begin with {self.list_url}")
@retry(stop_max_attempt_number=6, wait_fixed=10000)
def send_request(self, url, language):
def _request(self, url, language):
return html.fromstring(requests.get(url, headers={"Accept-Language": language}).content)
def get_items(self, method, data, language, status_message=True):
@ -157,7 +157,7 @@ class TVDbAPI:
elif method == "tvdb_movie":
movie_ids.append(self.get_movie(language, data).id)
elif method == "tvdb_list":
tmdb_ids, tvdb_ids = self.get_tvdb_ids_from_url(data, language)
tmdb_ids, tvdb_ids = self._ids_from_url(data, language)
movie_ids.extend(tmdb_ids)
show_ids.extend(tvdb_ids)
else:

Loading…
Cancel
Save