You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/mal.py

329 lines
16 KiB

import re, secrets, time, webbrowser
from datetime import datetime
from json import JSONDecodeError
4 years ago
from modules import util
from modules.util import Failed, TimeoutExpired, YAML
4 years ago
logger = util.logger
4 years ago
builders = [
"mal_id", "mal_all", "mal_airing", "mal_upcoming", "mal_tv", "mal_ova", "mal_movie", "mal_special",
"mal_popular", "mal_favorite", "mal_season", "mal_suggested", "mal_userlist", "mal_genre", "mal_studio", "mal_search"
]
mal_ranked_name = {
3 years ago
"mal_all": "all", "mal_airing": "airing", "mal_upcoming": "upcoming", "mal_tv": "tv", "mal_ova": "ova",
"mal_movie": "movie", "mal_special": "special", "mal_popular": "bypopularity", "mal_favorite": "favorite"
}
3 years ago
mal_ranked_pretty = {
"mal_all": "MyAnimeList All", "mal_airing": "MyAnimeList Airing", "mal_search": "MyAnimeList Search",
3 years ago
"mal_upcoming": "MyAnimeList Upcoming", "mal_tv": "MyAnimeList TV", "mal_ova": "MyAnimeList OVA",
"mal_movie": "MyAnimeList Movie", "mal_special": "MyAnimeList Special", "mal_popular": "MyAnimeList Popular",
"mal_favorite": "MyAnimeList Favorite", "mal_genre": "MyAnimeList Genre", "mal_studio": "MyAnimeList Studio"
3 years ago
}
3 years ago
season_sort_translation = {"score": "anime_score", "anime_score": "anime_score", "members": "anime_num_list_users", "anime_num_list_users": "anime_num_list_users"}
season_sort_options = ["score", "members"]
pretty_names = {
3 years ago
"anime_score": "Score", "list_score": "Score", "anime_num_list_users": "Members", "list_updated_at": "Last Updated",
"anime_title": "Title", "anime_start_date": "Start Date", "all": "All Anime", "watching": "Currently Watching",
"completed": "Completed", "on_hold": "On Hold", "dropped": "Dropped", "plan_to_watch": "Plan to Watch"
}
userlist_sort_translation = {
3 years ago
"score": "list_score", "list_score": "list_score",
"last_updated": "list_updated_at", "list_updated": "list_updated_at", "list_updated_at": "list_updated_at",
"title": "anime_title", "anime_title": "anime_title",
"start_date": "anime_start_date", "anime_start_date": "anime_start_date"
}
userlist_sort_options = ["score", "last_updated", "title", "start_date"]
3 years ago
userlist_status = ["all", "watching", "completed", "on_hold", "dropped", "plan_to_watch"]
search_types = ["tv", "movie", "ova", "special", "ona", "music"]
search_status = ["airing", "complete", "upcoming"]
search_ratings = ["g", "pg", "pg13", "r17", "r", "rx"]
search_sorts = ["mal_id", "title", "type", "rating", "start_date", "end_date", "episodes", "score", "scored_by", "rank", "popularity", "members", "favorites"]
search_combos = [f"{s}.{d}" for s in search_sorts for d in ["desc", "asc"]]
base_url = "https://api.myanimelist.net/v2/"
jikan_base_url = "https://api.jikan.moe/v4/"
uni_code_verifier = "k_UHwN_eHAPQVXiceC-rYGkozKqrJmKxPUIUOBIKo1noq_4XGRVCViP_dGcwB-fkPql8f56mmWj5aWCa2HDeugf6sRvnc9Rjhbb1vKGYLY0IwWsDNXRqXdksaVGJthux"
urls = {
"oauth_token": "https://myanimelist.net/v1/oauth2/token",
"oauth_authorize": "https://myanimelist.net/v1/oauth2/authorize",
"ranking": f"{base_url}anime/ranking",
"season": f"{base_url}anime/season",
"suggestions": f"{base_url}anime/suggestions",
"user": f"{base_url}users"
}
3 years ago
class MyAnimeListObj:
def __init__(self, mal, mal_id, data, cache=False):
self.mal = mal
self.mal_id = mal_id
self._data = data
self.title = self._data["title"]
self.title_english = self._data["title_english"]
self.title_japanese = self._data["title_japanese"]
self.status = self._data["status"]
self.airing = self._data["airing"]
try:
if cache:
self.aired = datetime.strptime(self._data["aired"], "%Y-%m-%d")
else:
self.aired = datetime.strptime(self._data["aired"]["from"], "%Y-%m-%dT%H:%M:%S%z")
except (ValueError, TypeError):
self.aired = None
self.rating = self._data["rating"]
self.score = self._data["score"]
self.rank = self._data["rank"]
self.popularity = self._data["popularity"]
2 years ago
self.genres = [] if not self._data["genres"] else self._data["genres"].split("|") if cache else [g["name"] for g in self._data["genres"]]
self.studio = self._data["studio"] if cache else self._data["studios"][0]["name"] if self._data["studios"] else None
4 years ago
class MyAnimeList:
def __init__(self, config, params):
self.config = config
4 years ago
self.client_id = params["client_id"]
self.client_secret = params["client_secret"]
self.localhost_url = params["localhost_url"]
4 years ago
self.config_path = params["config_path"]
self.expiration = params["cache_expiration"]
self.authorization = params["authorization"]
logger.secret(self.client_secret)
try:
if not self._save(self.authorization):
if not self._refresh():
self._authorization()
except Exception:
logger.stacktrace()
raise Failed("MyAnimeList Error: Failed to Connect")
self._genres = {}
self._studios = {}
self._delay = None
@property
def genres(self):
if not self._genres:
for data in self._jikan_request("genres/anime")["data"]:
self._genres[data["name"]] = int(data["mal_id"])
self._genres[data["name"].lower()] = int(data["mal_id"])
self._genres[str(data["mal_id"])] = int(data["mal_id"])
self._genres[int(data["mal_id"])] = data["name"]
return self._genres
@property
def studios(self):
if not self._studios:
for data in self._pagination("producers"):
2 years ago
self._studios[data["titles"][0]["title"]] = int(data["mal_id"])
self._studios[data["titles"][0]["title"].lower()] = int(data["mal_id"])
self._studios[str(data["mal_id"])] = int(data["mal_id"])
self._studios[int(data["mal_id"])] = data["titles"][0]["title"]
return self._studios
4 years ago
4 years ago
def _authorization(self):
if self.localhost_url:
code_verifier = uni_code_verifier
url = self.localhost_url
else:
code_verifier = secrets.token_urlsafe(100)[:128]
url = f"{urls['oauth_authorize']}?response_type=code&client_id={self.client_id}&code_challenge={code_verifier}"
logger.info("")
logger.info(f"Navigate to: {url}")
logger.info("")
logger.info("Login and click the Allow option. You will then be redirected to a localhost")
logger.info("url that most likely won't load, which is fine. Copy the URL and paste it below")
webbrowser.open(url, new=2)
try: url = util.logger_input("URL").strip()
except TimeoutExpired: raise Failed("Input Timeout: URL required.")
if not url: raise Failed("MyAnimeList Error: No input MyAnimeList code required.")
4 years ago
match = re.search("code=([^&]+)", str(url))
if not match:
raise Failed("MyAnimeList Error: Invalid URL")
code = match.group(1)
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"code_verifier": code_verifier,
"grant_type": "authorization_code"
}
4 years ago
new_authorization = self._oauth(data)
4 years ago
if "error" in new_authorization:
raise Failed("MyAnimeList Error: Invalid code")
4 years ago
if not self._save(new_authorization):
4 years ago
raise Failed("MyAnimeList Error: New Authorization Failed")
4 years ago
def _check(self, authorization):
4 years ago
try:
self._request(urls["suggestions"], authorization=authorization)
4 years ago
return True
except Failed as e:
logger.debug(e)
return False
4 years ago
def _refresh(self):
4 years ago
if self.authorization and "refresh_token" in self.authorization and self.authorization["refresh_token"]:
logger.info("Refreshing Access Token...")
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.authorization["refresh_token"],
"grant_type": "refresh_token"
}
4 years ago
refreshed_authorization = self._oauth(data)
return self._save(refreshed_authorization)
4 years ago
return False
4 years ago
def _save(self, authorization):
if authorization is not None and "access_token" in authorization and authorization["access_token"] and self._check(authorization):
if self.authorization != authorization and not self.config.read_only:
yaml = YAML(self.config_path)
yaml.data["mal"]["authorization"] = {
4 years ago
"access_token": authorization["access_token"],
"token_type": authorization["token_type"],
"expires_in": authorization["expires_in"],
"refresh_token": authorization["refresh_token"]
}
logger.info(f"Saving authorization information to {self.config_path}")
yaml.save()
logger.secret(authorization["access_token"])
4 years ago
self.authorization = authorization
return True
return False
4 years ago
def _oauth(self, data):
return self.config.post_json(urls["oauth_token"], data=data)
4 years ago
4 years ago
def _request(self, url, authorization=None):
token = authorization["access_token"] if authorization else self.authorization["access_token"]
logger.trace(f"URL: {url}")
try:
response = self.config.get_json(url, headers={"Authorization": f"Bearer {token}"})
logger.trace(f"Response: {response}")
if "error" in response: raise Failed(f"MyAnimeList Error: {response['error']}")
else: return response
except JSONDecodeError:
raise Failed(f"MyAnimeList Error: Connection Failed")
4 years ago
def _jikan_request(self, url, params=None):
logger.trace(f"URL: {jikan_base_url}{url}")
logger.trace(f"Params: {params}")
time_check = time.time()
if self._delay is not None:
while time_check - self._delay < 1:
time_check = time.time()
data = self.config.get_json(f"{jikan_base_url}{url}", params=params)
self._delay = time.time()
return data
def _parse_request(self, url, node=False):
4 years ago
data = self._request(url)
return [d["node"] if node else d["node"]["id"] for d in data["data"]] if "data" in data else []
4 years ago
4 years ago
def _username(self):
return self._request(f"{urls['user']}/@me")["name"]
4 years ago
4 years ago
def _ranked(self, ranking_type, limit):
url = f"{urls['ranking']}?ranking_type={ranking_type}&limit={limit}"
4 years ago
return self._parse_request(url)
4 years ago
def _season(self, data):
url = f"{urls['season']}/{data['year']}/{data['season']}?sort={data['sort_by']}&limit=500"
if data["starting_only"]:
url += "&fields=start_season"
results = []
for anime in self._parse_request(url, node=True):
if data["starting_only"] and (anime["start_season"]["year"] != data["year"] or anime["start_season"]["season"] != data["season"]):
continue
results.append(anime["id"])
if len(results) == data["limit"]:
break
return results
4 years ago
4 years ago
def _suggestions(self, limit):
url = f"{urls['suggestions']}?limit={limit}"
4 years ago
return self._parse_request(url)
4 years ago
4 years ago
def _userlist(self, username, status, sort_by, limit):
final_status = "" if status == "all" else f"status={status}&"
url = f"{urls['user']}/{username}/animelist?{final_status}sort={sort_by}&limit={limit}"
4 years ago
return self._parse_request(url)
4 years ago
def _pagination(self, endpoint, params=None, limit=None):
data = self._jikan_request(endpoint, params)
last_visible_page = data["pagination"]["last_visible_page"]
if limit is not None:
total_items = data["pagination"]["items"]["total"]
if total_items == 0:
raise Failed("MyAnimeList Error: No MyAnimeList IDs for Search")
if total_items < limit or limit <= 0:
limit = total_items
per_page = len(data["data"])
mal_ids = []
current_page = 1
chances = 0
while current_page <= last_visible_page:
if chances > 6:
logger.debug(data)
raise Failed("AniList Error: Connection Failed")
start_num = (current_page - 1) * per_page + 1
end_num = limit if limit and (current_page == last_visible_page or limit < start_num + per_page) else current_page * per_page
logger.ghost(f"Parsing Page {current_page}/{last_visible_page} {start_num}-{end_num}")
if current_page > 1:
if params is None:
params = {}
params["page"] = current_page
data = self._jikan_request(endpoint, params)
if "data" in data:
chances = 0
mal_ids.extend(data["data"])
if limit and len(mal_ids) >= limit:
return mal_ids[:limit]
current_page += 1
else:
chances += 1
logger.exorcise()
return mal_ids
def get_anime(self, mal_id):
expired = None
if self.config.Cache:
mal_dict, expired = self.config.Cache.query_mal(mal_id, self.expiration)
if mal_dict and expired is False:
return MyAnimeListObj(self, mal_id, mal_dict, cache=True)
try:
response = self._jikan_request(f"anime/{mal_id}")
except JSONDecodeError:
raise Failed("MyAnimeList Error: JSON Decoding Failed")
if "data" not in response:
raise Failed(f"MyAnimeList Error: No Anime found for MyAnimeList ID: {mal_id}")
mal = MyAnimeListObj(self, mal_id, response["data"])
if self.config.Cache:
self.config.Cache.update_mal(expired, mal_id, mal, self.expiration)
return mal
3 years ago
def get_mal_ids(self, method, data):
4 years ago
if method == "mal_id":
3 years ago
logger.info(f"Processing MyAnimeList ID: {data}")
4 years ago
mal_ids = [data]
elif method in mal_ranked_name:
3 years ago
logger.info(f"Processing {mal_ranked_pretty[method]}: {data} Anime")
4 years ago
mal_ids = self._ranked(mal_ranked_name[method], data)
elif method == "mal_search":
logger.info(f"Processing {data[1]}")
mal_ids = [mal_data["mal_id"] for mal_data in self._pagination("anime", params=data[0], limit=data[2])]
4 years ago
elif method == "mal_season":
logger.info(f"Processing MyAnimeList Season: {data['limit']} Anime from {data['season'].title()} {data['year']} sorted by {pretty_names[data['sort_by']]}")
mal_ids = self._season(data)
4 years ago
elif method == "mal_suggested":
3 years ago
logger.info(f"Processing MyAnimeList Suggested: {data} Anime")
4 years ago
mal_ids = self._suggestions(data)
4 years ago
elif method == "mal_userlist":
logger.info(f"Processing MyAnimeList UserList: {data['limit']} Anime from {self._username() if data['username'] == '@me' else data['username']}'s {pretty_names[data['status']]} list sorted by {pretty_names[data['sort_by']]}")
4 years ago
mal_ids = self._userlist(data["username"], data["status"], data["sort_by"], data["limit"])
4 years ago
else:
raise Failed(f"MyAnimeList Error: Method {method} not supported")
logger.debug("")
logger.debug(f"{len(mal_ids)} MyAnimeList IDs Found")
logger.trace(f"IDs: {mal_ids}")
3 years ago
return mal_ids