added flixpatrol builders

pull/465/head
meisnate12 3 years ago
parent 62532535c9
commit d223da35cf

@ -1,6 +1,6 @@
import logging, os, re import logging, os, re
from datetime import datetime, timedelta from datetime import datetime, timedelta
from modules import anidb, anilist, icheckmovies, imdb, letterboxd, mal, plex, radarr, sonarr, stevenlu, tautulli, tmdb, trakt, tvdb, util from modules import anidb, anilist, flixpatrol, icheckmovies, imdb, letterboxd, mal, plex, radarr, sonarr, stevenlu, tautulli, tmdb, trakt, tvdb, util
from modules.util import Failed, ImageData, NotScheduled from modules.util import Failed, ImageData, NotScheduled
from PIL import Image from PIL import Image
from plexapi.exceptions import BadRequest, NotFound from plexapi.exceptions import BadRequest, NotFound
@ -63,8 +63,9 @@ filter_translation = {
"writer": "writers" "writer": "writers"
} }
modifier_alias = {".greater": ".gt", ".less": ".lt"} modifier_alias = {".greater": ".gt", ".less": ".lt"}
all_builders = anidb.builders + anilist.builders + icheckmovies.builders + imdb.builders + letterboxd.builders + \ all_builders = anidb.builders + anilist.builders + flixpatrol.builders + icheckmovies.builders + imdb.builders + \
mal.builders + plex.builders + stevenlu.builders + tautulli.builders + tmdb.builders + trakt.builders + tvdb.builders letterboxd.builders + mal.builders + plex.builders + stevenlu.builders + tautulli.builders + \
tmdb.builders + trakt.builders + tvdb.builders
show_only_builders = ["tmdb_network", "tmdb_show", "tmdb_show_details", "tvdb_show", "tvdb_show_details", "collection_level"] show_only_builders = ["tmdb_network", "tmdb_show", "tmdb_show_details", "tvdb_show", "tvdb_show_details", "collection_level"]
movie_only_builders = [ movie_only_builders = [
"letterboxd_list", "letterboxd_list_details", "icheckmovies_list", "icheckmovies_list_details", "stevenlu_popular", "letterboxd_list", "letterboxd_list_details", "icheckmovies_list", "icheckmovies_list_details", "stevenlu_popular",
@ -570,6 +571,7 @@ class CollectionBuilder:
elif method_name in sonarr_details: self._sonarr(method_name, method_data) elif method_name in sonarr_details: self._sonarr(method_name, method_data)
elif method_name in anidb.builders: self._anidb(method_name, method_data) elif method_name in anidb.builders: self._anidb(method_name, method_data)
elif method_name in anilist.builders: self._anilist(method_name, method_data) elif method_name in anilist.builders: self._anilist(method_name, method_data)
elif method_name in flixpatrol.builders: self._flixpatrol(method_name, method_data)
elif method_name in icheckmovies.builders: self._icheckmovies(method_name, method_data) elif method_name in icheckmovies.builders: self._icheckmovies(method_name, method_data)
elif method_name in letterboxd.builders: self._letterboxd(method_name, method_data) elif method_name in letterboxd.builders: self._letterboxd(method_name, method_data)
elif method_name in imdb.builders: self._imdb(method_name, method_data) elif method_name in imdb.builders: self._imdb(method_name, method_data)
@ -861,6 +863,38 @@ class CollectionBuilder:
new_dictionary["limit"] = util.parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name) new_dictionary["limit"] = util.parse("limit", dict_data, datatype="int", methods=dict_methods, default=0, parent=method_name)
self.builders.append((method_name, new_dictionary)) self.builders.append((method_name, new_dictionary))
def _flixpatrol(self, method_name, method_data):
if method_name.startswith("flixpatrol_url"):
flixpatrol_lists = self.config.FlixPatrol.validate_flixpatrol_lists(method_data, self.language, self.library.is_movie)
for flixpatrol_list in flixpatrol_lists:
self.builders.append(("flixpatrol_url", flixpatrol_list))
elif method_name in flixpatrol.builders:
for dict_data, dict_methods in util.parse(method_name, method_data, datatype="dictlist"):
if method_name == "flixpatrol_demographics":
data = {
"generation": util.parse("generation", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.generations),
"gender": util.parse("gender", dict_data, methods=dict_methods, parent=method_name, default="all", options=flixpatrol.gender),
"location": util.parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.demo_locations),
"limit": util.parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
elif method_name == "flixpatrol_popular":
data = {
"source": util.parse("source", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.popular),
"time_window": util.parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": util.parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
elif method_name == "flixpatrol_top":
data = {
"platform": util.parse("platform", dict_data, methods=dict_methods, parent=method_name, options=flixpatrol.platforms),
"location": util.parse("location", dict_data, methods=dict_methods, parent=method_name, default="world", options=flixpatrol.locations),
"time_window": util.parse("time_window", dict_data, methods=dict_methods, parent=method_name, default="today"),
"limit": util.parse("limit", dict_data, datatype="int", methods=dict_methods, parent=method_name, default=10)
}
else:
continue
if self.config.FlixPatrol.validate_flixpatrol_dict(method_name, data, self.language, self.library.is_movie):
self.builders.append((method_name, data))
def _icheckmovies(self, method_name, method_data): def _icheckmovies(self, method_name, method_data):
if method_name.startswith("icheckmovies_list"): if method_name.startswith("icheckmovies_list"):
icheckmovies_lists = self.config.ICheckMovies.validate_icheckmovies_lists(method_data, self.language) icheckmovies_lists = self.config.ICheckMovies.validate_icheckmovies_lists(method_data, self.language)
@ -1133,6 +1167,8 @@ class CollectionBuilder:
ids = self.config.TVDb.get_tvdb_ids(method, value) ids = self.config.TVDb.get_tvdb_ids(method, value)
elif "imdb" in method: elif "imdb" in method:
ids = self.config.IMDb.get_imdb_ids(method, value, self.language) ids = self.config.IMDb.get_imdb_ids(method, value, self.language)
elif "flixpatrol" in method:
ids = self.config.FlixPatrol.get_flixpatrol_ids(method, value, self.language, self.library.is_movie)
elif "icheckmovies" in method: elif "icheckmovies" in method:
ids = self.config.ICheckMovies.get_icheckmovies_ids(method, value, self.language) ids = self.config.ICheckMovies.get_icheckmovies_ids(method, value, self.language)
elif "letterboxd" in method: elif "letterboxd" in method:

@ -60,6 +60,14 @@ class Cache:
tmdb_id TEXT, tmdb_id TEXT,
expiration_date TEXT)""" expiration_date TEXT)"""
) )
cursor.execute(
"""CREATE TABLE IF NOT EXISTS flixpatrol_map (
key INTEGER PRIMARY KEY,
flixpatrol_id TEXT UNIQUE,
tmdb_id TEXT,
media_type TEXT,
expiration_date TEXT)"""
)
cursor.execute( cursor.execute(
"""CREATE TABLE IF NOT EXISTS omdb_data ( """CREATE TABLE IF NOT EXISTS omdb_data (
key INTEGER PRIMARY KEY, key INTEGER PRIMARY KEY,
@ -161,6 +169,12 @@ class Cache:
def update_letterboxd_map(self, expired, letterboxd_id, tmdb_id): def update_letterboxd_map(self, expired, letterboxd_id, tmdb_id):
self._update_map("letterboxd_map", "letterboxd_id", letterboxd_id, "tmdb_id", tmdb_id, expired) self._update_map("letterboxd_map", "letterboxd_id", letterboxd_id, "tmdb_id", tmdb_id, expired)
def query_flixpatrol_map(self, flixpatrol_id, media_type):
return self._query_map("flixpatrol_map", flixpatrol_id, "flixpatrol_id", "tmdb_id", media_type=media_type)
def update_flixpatrol_map(self, expired, flixpatrol_id, tmdb_id, media_type):
self._update_map("flixpatrol_map", "flixpatrol_id", flixpatrol_id, "tmdb_id", tmdb_id, expired, media_type=media_type)
def _query_map(self, map_name, _id, from_id, to_id, media_type=None, return_type=False): def _query_map(self, map_name, _id, from_id, to_id, media_type=None, return_type=False):
id_to_return = None id_to_return = None
expired = None expired = None

@ -6,6 +6,7 @@ from modules.anidb import AniDB
from modules.anilist import AniList from modules.anilist import AniList
from modules.cache import Cache from modules.cache import Cache
from modules.convert import Convert from modules.convert import Convert
from modules.flixpatrol import FlixPatrol
from modules.icheckmovies import ICheckMovies from modules.icheckmovies import ICheckMovies
from modules.imdb import IMDb from modules.imdb import IMDb
from modules.letterboxd import Letterboxd from modules.letterboxd import Letterboxd
@ -320,8 +321,9 @@ class Config:
self.IMDb = IMDb(self) self.IMDb = IMDb(self)
self.Convert = Convert(self) self.Convert = Convert(self)
self.AniList = AniList(self) self.AniList = AniList(self)
self.Letterboxd = Letterboxd(self) self.FlixPatrol = FlixPatrol(self)
self.ICheckMovies = ICheckMovies(self) self.ICheckMovies = ICheckMovies(self)
self.Letterboxd = Letterboxd(self)
self.StevenLu = StevenLu(self) self.StevenLu = StevenLu(self)
util.separator() util.separator()

@ -0,0 +1,162 @@
import logging
from datetime import datetime, timedelta
from modules import util
from modules.util import Failed
logger = logging.getLogger("Plex Meta Manager")
builders = ["flixpatrol_url", "flixpatrol_demographics", "flixpatrol_popular", "flixpatrol_top"]
generations = ["all", "boomers", "x", "y", "z"]
generations_translation = {"all": "all-generations", "boomers": "baby-boomers", "x": "generation-x", "y": "generation-y", "z": "generation-z"}
generations_pretty = {"all": "All generations", "boomers": "Baby Boomers", "x": "Generation X", "y": "Generation Y (Millenials)", "z": "Generation Z"}
gender = ["all", "men", "women"]
demo_locations = ["world", "brazil", "canada", "france", "germany", "india", "mexico", "united_kingdom", "united_states"]
locations = [
"albania", "argentina", "armenia", "australia", "austria", "azerbaijan", "bahamas", "bahrain", "bangladesh",
"belarus", "belgium", "belize", "benin", "bolivia", "bosnia_and_herzegovina", "botswana", "brazil", "bulgaria",
"burkina_faso", "cambodia", "canada", "chile", "colombia", "costa_rica", "croatia", "cyprus", "czech_republic",
"denmark", "dominican_republic", "ecuador", "egypt", "estonia", "finland", "france", "gabon", "germany", "ghana",
"greece", "guatemala", "guinea_bissau", "haiti", "honduras", "hong_kong", "hungary", "iceland", "india",
"indonesia", "ireland", "israel", "italy", "ivory_coast", "jamaica", "japan", "jordan", "kazakhstan", "kenya",
"kuwait", "kyrgyzstan", "laos", "latvia", "lebanon", "lithuania", "luxembourg", "malaysia", "maldives", "mali",
"malta", "mexico", "moldova", "mongolia", "montenegro", "morocco", "mozambique", "namibia", "netherlands",
"new_zealand", "nicaragua", "niger", "nigeria", "north_macedonia", "norway", "oman", "pakistan", "panama",
"papua_new_guinea", "paraguay", "peru", "philippines", "poland", "portugal", "qatar", "romania", "russia",
"rwanda", "salvador", "saudi_arabia", "senegal", "serbia", "singapore", "slovakia", "slovenia", "south_africa",
"south_korea", "spain", "sri_lanka", "sweden", "switzerland", "taiwan", "tajikistan", "tanzania", "thailand",
"togo", "trinidad_and_tobago", "turkey", "turkmenistan", "uganda", "ukraine", "united_arab_emirates",
"united_kingdom", "united_states", "uruguay", "uzbekistan", "venezuela", "vietnam", "zambia", "zimbabwe"
]
popular = ["movie_db", "facebook", "google", "twitter", "twitter_trends", "instagram", "instagram_trends", "youtube", "imdb", "letterboxd", "rotten_tomatoes", "tmdb", "trakt"]
platforms = ["netflix", "hbo", "disney", "amazon", "itunes", "google", "paramount_plus", "hulu", "vudu", "imdb", "amazon_prime", "star_plus"]
base_url = "https://flixpatrol.com"
urls = {
"top10": f"{base_url}/top10/",
"popular_movies": f"{base_url}/popular/movies/",
"popular_shows": f"{base_url}/popular/tv-shows/",
"demographics": f"{base_url}/demographics/"
}
class FlixPatrol:
def __init__(self, config):
self.config = config
def _request(self, url, language, xpath):
if self.config.trace_mode:
logger.debug(f"URL: {url}")
return self.config.get_html(url, headers=util.header(language)).xpath(xpath)
def _tmdb(self, flixpatrol_url, language):
ids = self._request(flixpatrol_url, language, "//script[@type='application/ld+json']/text()")
if len(ids) > 0 and ids[0]:
if "https://www.themoviedb.org" in ids[0]:
return util.regex_first_int(ids[0].split("https://www.themoviedb.org")[1], "TMDB Movie ID")
raise Failed(f"FlixPatrol Error: TMDb Movie ID not found in {ids[0]}")
raise Failed(f"FlixPatrol Error: TMDb Movie ID not found at {flixpatrol_url}")
def _parse_list(self, list_url, language, is_movie):
flixpatrol_urls = []
if list_url.startswith(urls["top10"]):
platform = list_url[len(urls["top10"]):].split("/")[0]
flixpatrol_urls = self._request(
list_url, language,
f"//div[@id='{platform}-{'1' if is_movie else '2'}']//a[@class='hover:underline']/@href"
)
logger.info(flixpatrol_urls)
if not flixpatrol_urls:
flixpatrol_urls = self._request(
list_url, language,
f"//h3[text() = '{'TOP 10 Movies' if is_movie else 'TOP 10 TV Shows'}']/following-sibling::div//a[@class='hover:underline']/@href"
)
logger.info(flixpatrol_urls)
elif list_url.startswith(tuple([v for k, v in urls.items()])):
flixpatrol_urls = self._request(
list_url, language,
f"//a[@class='flex group' and .//span[.='{'Movie' if is_movie else 'TV Show'}']]/@href"
)
return flixpatrol_urls
def validate_flixpatrol_lists(self, flixpatrol_lists, language, is_movie):
valid_lists = []
print(flixpatrol_lists)
for flixpatrol_list in util.get_list(flixpatrol_lists, split=False):
list_url = flixpatrol_list.strip()
if not list_url.startswith(tuple([v for k, v in urls.items()])):
fails = "\n".join([f"{v} (For {k.replace('_', ' ').title()})" for k, v in urls.items()])
raise Failed(f"FlixPatrol Error: {list_url} must begin with either:{fails}")
elif len(self._parse_list(list_url, language, is_movie)) > 0:
valid_lists.append(list_url)
else:
raise Failed(f"FlixPatrol Error: {list_url} failed to parse")
print(valid_lists)
return valid_lists
def validate_flixpatrol_dict(self, method, data, language, is_movie):
return len(self.validate_flixpatrol_lists(self.get_url(method, data, is_movie), language, is_movie)) > 0
def get_url(self, method, data, is_movie):
if method == "flixpatrol_demographics":
return f"{urls['demographics']}" \
f"{generations_translation[data['generation']]}/" \
f"{'all-genders' if data['gender'] == 'all' else data['gender']}/" \
f"{data['location'].replace('_', '-')}/"
elif method == "flixpatrol_popular":
return f"{urls['popular_movies'] if is_movie else urls['popular_shows']}" \
f"{data['source'].replace('_', '-')}/" \
f"{util.time_window(data['time_window'])}/"
elif method == "flixpatrol_top":
return f"{urls['top10']}" \
f"{data['platform'].replace('_', '-')}/" \
f"{data['location'].replace('_', '-')}/" \
f"{util.time_window(data['time_window'])}/full/"
elif method == "flixpatrol_url":
return data
else:
raise Failed(f"FlixPatrol Error: Method {method} not supported")
def get_flixpatrol_ids(self, method, data, language, is_movie):
if method == "flixpatrol_demographics":
logger.info("Processing FlixPatrol Demographics:")
logger.info(f"\tGeneration: {generations_pretty[data['generation']]}")
logger.info(f"\tGender: {'All genders' if data['gender'] == 'all' else data['gender'].capitalize()}")
logger.info(f"\tLocation: {data['location'].replace('_', ' ').title()}")
logger.info(f"\tLimit: {data['limit']}")
elif method == "flixpatrol_popular":
logger.info("Processing FlixPatrol Popular:")
logger.info(f"\tSource: {data['source'].replace('_', ' ').title()}")
logger.info(f"\tTime Window: {data['time_window'].replace('_', ' ').title()}")
logger.info(f"\tLimit: {data['limit']}")
elif method == "flixpatrol_top":
logger.info("Processing FlixPatrol Top:")
logger.info(f"\tPlatform: {data['platform'].replace('_', ' ').title()}")
logger.info(f"\tLocation: {data['location'].replace('_', ' ').title()}")
logger.info(f"\tTime Window: {data['time_window'].replace('_', ' ').title()}")
logger.info(f"\tLimit: {data['limit']}")
elif method == "flixpatrol_url":
logger.info(f"Processing FlixPatrol URL: {data}")
url = self.get_url(method, data, is_movie)
items = self._parse_list(url, language, is_movie)
media_type = "movie" if is_movie else "show"
total_items = len(items)
if total_items > 0:
ids = []
for i, item in enumerate(items, 1):
util.print_return(f"Finding TMDb ID {i}/{total_items}")
tmdb_id = None
expired = None
if self.config.Cache:
tmdb_id, expired = self.config.Cache.query_flixpatrol_map(item, media_type)
if not tmdb_id or expired is not False:
try:
tmdb_id = self._tmdb(f"{base_url}{item}", language)
except Failed as e:
logger.error(e)
continue
if self.config.Cache:
self.config.Cache.update_flixpatrol_map(expired, item, tmdb_id, media_type)
ids.append((tmdb_id, "tmdb" if is_movie else "tmdb_show"))
logger.info(util.adjust_space(f"Processed {total_items} TMDb IDs"))
return ids
else:
raise Failed(f"FlixPatrol Error: No List Items found in {data}")

@ -251,6 +251,27 @@ def is_locked(filepath):
file_object.close() file_object.close()
return locked return locked
def time_window(time_window):
today = datetime.now()
if time_window == "today":
return f"{today:%Y-%m-%d}"
elif time_window == "yesterday":
return f"{today - timedelta(days=1):%Y-%m-%d}"
elif time_window == "this_week":
return f"{today:%Y-0%V}"
elif time_window == "last_week":
return f"{today - timedelta(weeks=1):%Y-0%V}"
elif time_window == "this_month":
return f"{today:%Y-%m}"
elif time_window == "last_month":
return f"{today.year}-{today.month - 1 or 12}"
elif time_window == "this_year":
return f"{today.year}"
elif time_window == "last_year":
return f"{today.year - 1}"
else:
return time_window
def glob_filter(filter_in): def glob_filter(filter_in):
filter_in = filter_in.translate({ord("["): "[[]", ord("]"): "[]]"}) if "[" in filter_in else filter_in filter_in = filter_in.translate({ord("["): "[[]", ord("]"): "[]]"}) if "[" in filter_in else filter_in
return glob.glob(filter_in) return glob.glob(filter_in)

Loading…
Cancel
Save