You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Plex-Meta-Manager/modules/request.py

280 lines
11 KiB

import base64, os, ruamel.yaml, requests
from lxml import html
from modules import util
from modules.poster import ImageData
from modules.util import Failed
from requests.exceptions import ConnectionError
from tenacity import retry, stop_after_attempt, wait_fixed
from urllib import parse
logger = util.logger
image_content_types = ["image/png", "image/jpeg", "image/webp"]
def get_header(headers, header, language):
if headers:
return headers
else:
if header and not language:
language = "en-US,en;q=0.5"
if language:
return {
"Accept-Language": "eng" if language == "default" else language,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:109.0) Gecko/20100101 Firefox/113.0"
}
def quote(data):
return parse.quote(str(data))
def quote_plus(data):
return parse.quote_plus(str(data))
def parse_qs(data):
return parse.parse_qs(data)
def urlparse(data):
return parse.urlparse(str(data))
class Version:
def __init__(self, version_string="Unknown", part_string=""):
self.full = version_string.replace("develop", "build")
version_parts = self.full.split("-build")
self.main = version_parts[0]
self.build = 0
self.part = int(part_string) if part_string else 0
if len(version_parts) > 1:
self.build = int(version_parts[1])
def __bool__(self):
return self.full != "Unknown"
def __repr__(self):
return str(self)
def __str__(self):
return f"{self.full}.{self.part}" if self.part else self.full
class Requests:
def __init__(self, local, part, env_branch, git_branch, verify_ssl=True):
self.local = Version(local, part)
self.env_branch = env_branch
self.git_branch = git_branch
self.image_content_types = ["image/png", "image/jpeg", "image/webp"]
self._nightly = None
self._develop = None
self._master = None
self._branch = None
self._latest = None
self._newest = None
self.session = self.create_session()
self.global_ssl = verify_ssl
if not self.global_ssl:
self.no_verify_ssl()
def create_session(self, verify_ssl=True):
session = requests.Session()
if not verify_ssl:
self.no_verify_ssl(session)
return session
def no_verify_ssl(self, session=None):
if session is None:
session = self.session
session.verify = False
if session.verify is False:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def download_image(self, title, image_url, download_directory, session=None, is_poster=True, filename=None):
response = self.get_image(image_url, session=session)
new_image = os.path.join(download_directory, f"{filename}") if filename else download_directory
if response.headers["Content-Type"] == "image/jpeg":
new_image += ".jpg"
elif response.headers["Content-Type"] == "image/webp":
new_image += ".webp"
else:
new_image += ".png"
with open(new_image, "wb") as handler:
handler.write(response.content)
return ImageData("asset_directory", new_image, prefix=f"{title}'s ", is_poster=is_poster, is_url=False)
def file_yaml(self, path_to_file, check_empty=False, create=False, start_empty=False):
return YAML(path=path_to_file, check_empty=check_empty, create=create, start_empty=start_empty)
def get_yaml(self, url, headers=None, params=None, check_empty=False):
response = self.get(url, headers=headers, params=params)
if response.status_code == 401:
raise Failed(f"URL Error: Unauthorized - {url}")
if response.status_code == 404:
raise Failed(f"URL Error: No file found at {url}")
if response.status_code == 429:
raise Failed(f"URL Error: Too many requests - {url}")
if response.status_code >= 400:
raise Failed(f"URL Error: {response.status_code} on {url}")
return YAML(input_data=response.content, check_empty=check_empty)
def get_image(self, url, session=None):
response = self.get(url, header=True) if session is None else session.get(url, headers=get_header(None, True, None))
if response.status_code == 404:
raise Failed(f"Image Error: Not Found on Image URL: {url}")
if response.status_code >= 400:
raise Failed(f"Image Error: {response.status_code} on Image URL: {url}")
if "Content-Type" not in response.headers or response.headers["Content-Type"] not in self.image_content_types:
raise Failed("Image Not PNG, JPG, or WEBP")
return response
def get_stream(self, url, location, info="Item"):
with self.session.get(url, stream=True) as r:
r.raise_for_status()
total_length = r.headers.get('content-length')
if total_length is not None:
total_length = int(total_length)
dl = 0
with open(location, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
dl += len(chunk)
f.write(chunk)
logger.ghost(f"Downloading {info}: {dl / total_length * 100:6.2f}%")
logger.exorcise()
def get_html(self, url, headers=None, params=None, header=None, language=None):
return html.fromstring(self.get(url, headers=headers, params=params, header=header, language=language).content)
def get_json(self, url, json=None, headers=None, params=None, header=None, language=None):
response = self.get(url, json=json, headers=headers, params=params, header=header, language=language)
try:
return response.json()
except ValueError:
logger.error(str(response.content))
raise
@retry(stop=stop_after_attempt(6), wait=wait_fixed(10))
def get(self, url, json=None, headers=None, params=None, header=None, language=None):
return self.session.get(url, json=json, headers=get_header(headers, header, language), params=params)
def get_image_encoded(self, url):
return base64.b64encode(self.get(url).content).decode('utf-8')
def post_html(self, url, data=None, json=None, headers=None, header=None, language=None):
return html.fromstring(self.post(url, data=data, json=json, headers=headers, header=header, language=language).content)
def post_json(self, url, data=None, json=None, headers=None, header=None, language=None):
response = self.post(url, data=data, json=json, headers=headers, header=header, language=language)
try:
return response.json()
except ValueError:
logger.error(str(response.content))
raise
@retry(stop=stop_after_attempt(6), wait=wait_fixed(10))
def post(self, url, data=None, json=None, headers=None, header=None, language=None):
return self.session.post(url, data=data, json=json, headers=get_header(headers, header, language))
def has_new_version(self):
return self.local and self.latest and self.local.main != self.latest.main or (self.local.build and self.local.build < self.latest.build)
@property
def branch(self):
if self._branch is None:
if self.git_branch in ["develop", "nightly"]:
self._branch = self.git_branch
elif self.env_branch in ["develop", "nightly"]:
self._branch = self.env_branch
elif self.local.build > 0:
if self.local.main != self.develop.main or self.local.build <= self.develop.build:
self._branch = "develop"
else:
self._branch = "nightly"
else:
self._branch = "master"
return self._branch
@property
def latest(self):
if self._latest is None:
if self.branch == "develop":
self._latest = self.develop
elif self.branch == "nightly":
self._latest = self.nightly
elif self.local.build > 0:
if self.local.main != self.develop.main or self.develop.build >= self.local.build:
self._latest = self.develop
self._latest = self.nightly
else:
self._latest = self.master
return self._latest
@property
def newest(self):
if self._newest is None:
self._newest = self.latest if self.latest and (self.local.main != self.latest.main or (self.local.build and self.local.build < self.latest.build)) else None
return self._newest
@property
def master(self):
if self._master is None:
self._master = self._version("master")
return self._master
@property
def develop(self):
if self._develop is None:
self._develop = self._version("develop")
return self._develop
@property
def nightly(self):
if self._nightly is None:
self._nightly = self._version("nightly")
return self._nightly
def _version(self, level):
try:
url = f"https://raw.githubusercontent.com/Kometa-Team/Kometa/{level}/VERSION"
return Version(self.get(url).content.decode().strip())
except ConnectionError:
return Version()
class YAML:
def __init__(self, path=None, input_data=None, check_empty=False, create=False, start_empty=False):
self.path = path
self.input_data = input_data
self.yaml = ruamel.yaml.YAML()
self.yaml.width = 100000
self.yaml.indent(mapping=2, sequence=2)
try:
if input_data:
self.data = self.yaml.load(input_data)
else:
if start_empty or (create and not os.path.exists(self.path)):
with open(self.path, 'w'):
pass
self.data = {}
else:
with open(self.path, encoding="utf-8") as fp:
self.data = self.yaml.load(fp)
except ruamel.yaml.error.YAMLError as e:
if "found character '\\t' that cannot start any token" in e.problem:
location = f"{e.args[3].name}; line {e.args[3].line + 1} column {e.args[3].column + 1}"
e = f"Tabs are not allowed in YAML files; only spaces are allowed.\nfirst tab character found at:\n{location}"
else:
e = str(e).replace("\n", "\n ")
raise Failed(f"YAML Error: {e}")
except Exception as e:
raise Failed(f"YAML Error: {e}")
if not self.data or not isinstance(self.data, dict):
if check_empty:
raise Failed("YAML Error: File is empty")
self.data = {}
def save(self):
if self.path:
with open(self.path, 'w', encoding="utf-8") as fp:
self.yaml.dump(self.data, fp)