|
|
|
@ -1,16 +1,13 @@
|
|
|
|
|
import logging, requests, webbrowser
|
|
|
|
|
from modules import util
|
|
|
|
|
from modules.util import Failed, TimeoutExpired
|
|
|
|
|
from retrying import retry
|
|
|
|
|
from ruamel import yaml
|
|
|
|
|
from trakt import Trakt as TraktAPI
|
|
|
|
|
from trakt.objects.episode import Episode
|
|
|
|
|
from trakt.objects.movie import Movie
|
|
|
|
|
from trakt.objects.season import Season
|
|
|
|
|
from trakt.objects.show import Show
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger("Plex Meta Manager")
|
|
|
|
|
|
|
|
|
|
redirect_uri = "urn:ietf:wg:oauth:2.0:oob"
|
|
|
|
|
redirect_uri_encoded = redirect_uri.replace(":", "%3A")
|
|
|
|
|
base_url = "https://api.trakt.tv"
|
|
|
|
|
builders = [
|
|
|
|
|
"trakt_collected",
|
|
|
|
|
"trakt_collection",
|
|
|
|
@ -26,49 +23,59 @@ builders = [
|
|
|
|
|
class Trakt:
|
|
|
|
|
def __init__(self, config, params, authorization=None):
|
|
|
|
|
self.config = config
|
|
|
|
|
self.base_url = "https://api.trakt.tv"
|
|
|
|
|
self.redirect_uri = "urn:ietf:wg:oauth:2.0:oob"
|
|
|
|
|
self.aliases = {
|
|
|
|
|
"trakt_trending": "Trakt Trending",
|
|
|
|
|
"trakt_watchlist": "Trakt Watchlist",
|
|
|
|
|
"trakt_list": "Trakt List"
|
|
|
|
|
}
|
|
|
|
|
self.client_id = params["client_id"]
|
|
|
|
|
self.client_secret = params["client_secret"]
|
|
|
|
|
self.config_path = params["config_path"]
|
|
|
|
|
self.authorization = authorization
|
|
|
|
|
TraktAPI.configuration.defaults.client(self.client_id, self.client_secret)
|
|
|
|
|
if not self._save(self.authorization):
|
|
|
|
|
if not self._refresh():
|
|
|
|
|
self._authorization()
|
|
|
|
|
|
|
|
|
|
def _authorization(self):
|
|
|
|
|
url = TraktAPI["oauth"].authorize_url(self.redirect_uri)
|
|
|
|
|
url = f"https://trakt.tv/oauth/authorize?response_type=code&client_id={self.client_id}&redirect_uri={redirect_uri_encoded}"
|
|
|
|
|
logger.info(f"Navigate to: {url}")
|
|
|
|
|
logger.info("If you get an OAuth error your client_id or client_secret is invalid")
|
|
|
|
|
webbrowser.open(url, new=2)
|
|
|
|
|
try: pin = util.logger_input("Trakt pin (case insensitive)", timeout=300).strip()
|
|
|
|
|
except TimeoutExpired: raise Failed("Input Timeout: Trakt pin required.")
|
|
|
|
|
if not pin: raise Failed("Trakt Error: No input Trakt pin required.")
|
|
|
|
|
new_authorization = TraktAPI["oauth"].token(pin, self.redirect_uri)
|
|
|
|
|
if not new_authorization:
|
|
|
|
|
json = {
|
|
|
|
|
"code": pin,
|
|
|
|
|
"client_id": self.client_id,
|
|
|
|
|
"client_secret": self.client_secret,
|
|
|
|
|
"redirect_uri": redirect_uri,
|
|
|
|
|
"grant_type": "authorization_code"
|
|
|
|
|
}
|
|
|
|
|
response = self.config.post(f"{base_url}/oauth/token", json=json, headers={"Content-Type": "application/json"})
|
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
raise Failed("Trakt Error: Invalid trakt pin. If you're sure you typed it in correctly your client_id or client_secret may be invalid")
|
|
|
|
|
if not self._save(new_authorization):
|
|
|
|
|
elif not self._save(response.json()):
|
|
|
|
|
raise Failed("Trakt Error: New Authorization Failed")
|
|
|
|
|
|
|
|
|
|
def _check(self, authorization):
|
|
|
|
|
try:
|
|
|
|
|
with TraktAPI.configuration.oauth.from_response(authorization, refresh=True):
|
|
|
|
|
if TraktAPI["users/settings"].get():
|
|
|
|
|
return True
|
|
|
|
|
except ValueError: pass
|
|
|
|
|
return False
|
|
|
|
|
def _check(self, authorization=None):
|
|
|
|
|
headers = {
|
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
|
"Authorization": f"Bearer {self.authorization['access_token'] if authorization is None else authorization['access_token']}",
|
|
|
|
|
"trakt-api-version": "2",
|
|
|
|
|
"trakt-api-key": self.client_id
|
|
|
|
|
}
|
|
|
|
|
response = self.config.get(f"{base_url}/users/settings", headers=headers)
|
|
|
|
|
return response.status_code == 200
|
|
|
|
|
|
|
|
|
|
def _refresh(self):
|
|
|
|
|
if self.authorization and "refresh_token" in self.authorization and self.authorization["refresh_token"]:
|
|
|
|
|
logger.info("Refreshing Access Token...")
|
|
|
|
|
refreshed_authorization = TraktAPI["oauth"].token_refresh(self.authorization["refresh_token"], self.redirect_uri)
|
|
|
|
|
return self._save(refreshed_authorization)
|
|
|
|
|
json = {
|
|
|
|
|
"refresh_token": self.authorization["refresh_token"],
|
|
|
|
|
"client_id": self.client_id,
|
|
|
|
|
"client_secret": self.client_secret,
|
|
|
|
|
"redirect_uri": redirect_uri,
|
|
|
|
|
"grant_type": "refresh_token"
|
|
|
|
|
}
|
|
|
|
|
response = self.config.post(f"{base_url}/oauth/token", json=json, headers={"Content-Type": "application/json"})
|
|
|
|
|
if response.status_code != 200:
|
|
|
|
|
return False
|
|
|
|
|
return self._save(response.json())
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _save(self, authorization):
|
|
|
|
@ -87,99 +94,89 @@ class Trakt:
|
|
|
|
|
logger.info(f"Saving authorization information to {self.config_path}")
|
|
|
|
|
yaml.round_trip_dump(config, open(self.config_path, "w"), indent=ind, block_seq_indent=bsi)
|
|
|
|
|
self.authorization = authorization
|
|
|
|
|
TraktAPI.configuration.defaults.oauth.from_response(self.authorization)
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
|
|
|
|
|
def _request(self, url):
|
|
|
|
|
headers = {
|
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
|
"Authorization": f"Bearer {self.authorization['access_token']}",
|
|
|
|
|
"trakt-api-version": "2",
|
|
|
|
|
"trakt-api-key": self.client_id
|
|
|
|
|
}
|
|
|
|
|
response = self.config.get(url, headers=headers)
|
|
|
|
|
if response.status_code == 200:
|
|
|
|
|
return response.json()
|
|
|
|
|
else:
|
|
|
|
|
raise Failed(f"({response.status_code}) {response.reason}")
|
|
|
|
|
|
|
|
|
|
def convert(self, external_id, from_source, to_source, media_type):
|
|
|
|
|
lookup = TraktAPI["search"].lookup(external_id, from_source, media_type)
|
|
|
|
|
if lookup:
|
|
|
|
|
lookup = lookup[0] if isinstance(lookup, list) else lookup
|
|
|
|
|
if lookup.get_key(to_source):
|
|
|
|
|
return lookup.get_key(to_source) if to_source == "imdb" else int(lookup.get_key(to_source))
|
|
|
|
|
path = f"/search/{from_source}/{external_id}"
|
|
|
|
|
if from_source in ["tmdb", "tvdb"]:
|
|
|
|
|
path = f"{path}?type={media_type}"
|
|
|
|
|
lookup = self._request(f"{base_url}{path}")
|
|
|
|
|
if lookup and media_type in lookup[0] and to_source in lookup[0][media_type]["ids"]:
|
|
|
|
|
return lookup[0][media_type]["ids"][to_source]
|
|
|
|
|
raise Failed(f"Trakt Error: No {to_source.upper().replace('B', 'b')} ID found for {from_source.upper().replace('B', 'b')} ID: {external_id}")
|
|
|
|
|
|
|
|
|
|
def collection(self, data, is_movie):
|
|
|
|
|
return self._user_list("collection", data, is_movie)
|
|
|
|
|
|
|
|
|
|
def _watchlist(self, data, is_movie):
|
|
|
|
|
return self._user_list("watchlist", data, is_movie)
|
|
|
|
|
def list_description(self, data):
|
|
|
|
|
try:
|
|
|
|
|
return self._request(f"{base_url}{requests.utils.urlparse(data).path}")["description"]
|
|
|
|
|
except Failed:
|
|
|
|
|
raise Failed(f"Trakt Error: List {data} not found")
|
|
|
|
|
|
|
|
|
|
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
|
|
|
|
|
def _user_list(self, list_type, data, is_movie):
|
|
|
|
|
items = TraktAPI[f"users/{data}/{list_type}"].movies() if is_movie else TraktAPI[f"users/{data}/{list_type}"].shows()
|
|
|
|
|
if items is None: raise Failed("Trakt Error: No List found")
|
|
|
|
|
else: return [i for i in items]
|
|
|
|
|
|
|
|
|
|
@retry(stop_max_attempt_number=6, wait_fixed=10000, retry_on_exception=util.retry_if_not_failed)
|
|
|
|
|
def standard_list(self, data):
|
|
|
|
|
try: trakt_list = TraktAPI[requests.utils.urlparse(data).path].get()
|
|
|
|
|
except AttributeError: trakt_list = None
|
|
|
|
|
if trakt_list is None: raise Failed("Trakt Error: No List found")
|
|
|
|
|
else: return trakt_list
|
|
|
|
|
|
|
|
|
|
def _request(self, url):
|
|
|
|
|
return self.config.get_json(url, headers={"Content-Type": "application/json", "trakt-api-version": "2", "trakt-api-key": self.client_id})
|
|
|
|
|
|
|
|
|
|
def _collection(self, username, is_movie):
|
|
|
|
|
items = self._request(f"{self.base_url}/users/{username}/collection/{'movies' if is_movie else 'shows'}")
|
|
|
|
|
path = f"{requests.utils.urlparse(data).path}/items" if list_type == "list" else f"/users/{data}/{list_type}"
|
|
|
|
|
try:
|
|
|
|
|
items = self._request(f"{base_url}{path}/{'movies' if is_movie else 'shows'}")
|
|
|
|
|
except Failed:
|
|
|
|
|
raise Failed(f"Trakt Error: {'List' if list_type == 'list' else 'User'} {data} not found")
|
|
|
|
|
if len(items) == 0:
|
|
|
|
|
if list_type == "list":
|
|
|
|
|
raise Failed(f"Trakt Error: List {data} is empty")
|
|
|
|
|
else:
|
|
|
|
|
raise Failed(f"Trakt Error: {data}'s {list_type.capitalize()} is empty")
|
|
|
|
|
if is_movie: return [item["movie"]["ids"]["tmdb"] for item in items], []
|
|
|
|
|
else: return [], [item["show"]["ids"]["tvdb"] for item in items]
|
|
|
|
|
|
|
|
|
|
def _pagenation(self, pagenation, amount, is_movie):
|
|
|
|
|
items = self._request(f"{self.base_url}/{'movies' if is_movie else 'shows'}/{pagenation}?limit={amount}")
|
|
|
|
|
items = self._request(f"{base_url}/{'movies' if is_movie else 'shows'}/{pagenation}?limit={amount}")
|
|
|
|
|
if pagenation == "popular" and is_movie: return [item["ids"]["tmdb"] for item in items], []
|
|
|
|
|
elif pagenation == "popular": return [], [item["ids"]["tvdb"] for item in items]
|
|
|
|
|
elif is_movie: return [item["movie"]["ids"]["tmdb"] for item in items], []
|
|
|
|
|
else: return [], [item["show"]["ids"]["tvdb"] for item in items]
|
|
|
|
|
|
|
|
|
|
def validate_trakt(self, values, trakt_type=None, is_movie=None):
|
|
|
|
|
def validate_trakt(self, values, is_movie, trakt_type="list"):
|
|
|
|
|
trakt_values = []
|
|
|
|
|
for value in values:
|
|
|
|
|
try:
|
|
|
|
|
if trakt_type == "watchlist" and is_movie is not None:
|
|
|
|
|
self._watchlist(value, is_movie)
|
|
|
|
|
elif trakt_type == "collection" and is_movie is not None:
|
|
|
|
|
self._collection(value, is_movie)
|
|
|
|
|
else:
|
|
|
|
|
self.standard_list(value)
|
|
|
|
|
self._user_list(trakt_type, value, is_movie)
|
|
|
|
|
trakt_values.append(value)
|
|
|
|
|
except Failed as e:
|
|
|
|
|
logger.error(e)
|
|
|
|
|
if len(trakt_values) == 0:
|
|
|
|
|
if trakt_type == "watchlist" and is_movie is not None:
|
|
|
|
|
if trakt_type == "watchlist":
|
|
|
|
|
raise Failed(f"Trakt Error: No valid Trakt Watchlists in {values}")
|
|
|
|
|
elif trakt_type == "collection" and is_movie is not None:
|
|
|
|
|
elif trakt_type == "collection":
|
|
|
|
|
raise Failed(f"Trakt Error: No valid Trakt Collections in {values}")
|
|
|
|
|
else:
|
|
|
|
|
raise Failed(f"Trakt Error: No valid Trakt Lists in {values}")
|
|
|
|
|
return trakt_values
|
|
|
|
|
|
|
|
|
|
def get_items(self, method, data, is_movie):
|
|
|
|
|
pretty = self.aliases[method] if method in self.aliases else method
|
|
|
|
|
pretty = util.pretty_names[method] if method in util.pretty_names else method
|
|
|
|
|
media_type = "Movie" if is_movie else "Show"
|
|
|
|
|
if method in ["trakt_trending", "trakt_popular", "trakt_recommended", "trakt_watched", "trakt_collected"]:
|
|
|
|
|
movie_ids, show_ids = self._pagenation(method[6:], data, is_movie)
|
|
|
|
|
logger.info(f"Processing {pretty}: {data} {media_type}{'' if data == 1 else 's'}")
|
|
|
|
|
elif method == "trakt_collection":
|
|
|
|
|
movie_ids, show_ids = self._collection(data, is_movie)
|
|
|
|
|
elif method in ["trakt_collection", "trakt_watchlist"]:
|
|
|
|
|
movie_ids, show_ids = self._user_list(method[6:], data, is_movie)
|
|
|
|
|
logger.info(f"Processing {pretty} {media_type}s for {data}")
|
|
|
|
|
else:
|
|
|
|
|
show_ids = []
|
|
|
|
|
movie_ids = []
|
|
|
|
|
if method == "trakt_watchlist": trakt_items = self._watchlist(data, is_movie)
|
|
|
|
|
elif method == "trakt_list": trakt_items = self.standard_list(data).items()
|
|
|
|
|
else: raise Failed(f"Trakt Error: Method {method} not supported")
|
|
|
|
|
elif method == "trakt_list":
|
|
|
|
|
movie_ids, show_ids = self._user_list(method[6:], data, is_movie)
|
|
|
|
|
logger.info(f"Processing {pretty}: {data}")
|
|
|
|
|
for trakt_item in trakt_items:
|
|
|
|
|
if isinstance(trakt_item, Movie):
|
|
|
|
|
movie_ids.append(int(trakt_item.get_key("tmdb")))
|
|
|
|
|
elif isinstance(trakt_item, Show) and trakt_item.pk[1] not in show_ids:
|
|
|
|
|
show_ids.append(int(trakt_item.pk[1]))
|
|
|
|
|
elif (isinstance(trakt_item, (Season, Episode))) and trakt_item.show.pk[1] not in show_ids:
|
|
|
|
|
show_ids.append(int(trakt_item.show.pk[1]))
|
|
|
|
|
logger.debug(f"Trakt {media_type} Found: {trakt_items}")
|
|
|
|
|
else:
|
|
|
|
|
raise Failed(f"Trakt Error: Method {method} not supported")
|
|
|
|
|
logger.debug("")
|
|
|
|
|
logger.debug(f"{len(movie_ids)} TMDb IDs Found: {movie_ids}")
|
|
|
|
|
logger.debug(f"{len(show_ids)} TVDb IDs Found: {show_ids}")
|