[92] update YAML read/writes

pull/877/head
meisnate12 3 years ago
parent da6d31c48b
commit 2c9ce0dfb4

@ -1 +1 @@
1.16.5-develop91 1.16.5-develop92

@ -30,4 +30,5 @@ All the following attributes serve various functions as how the collection/playl
| `changes_webhooks` | **Description:** Used to specify a collection/playlist changes webhook for just this collection/playlist.<br>**Values:** List of webhooks | | `changes_webhooks` | **Description:** Used to specify a collection/playlist changes webhook for just this collection/playlist.<br>**Values:** List of webhooks |
| `sync_to_trakt_list` | **Description:** Used to specify a trakt list you want the collection/playlist synced to.<br>**Values:** Trakt List Slug you want to sync to | | `sync_to_trakt_list` | **Description:** Used to specify a trakt list you want the collection/playlist synced to.<br>**Values:** Trakt List Slug you want to sync to |
| `sync_missing_to_trakt_list` | **Description:** Used to also sync missing items to the Trakt List specified by `sync_to_trakt_list`.<br>**Values:** `ture` or `false` | | `sync_missing_to_trakt_list` | **Description:** Used to also sync missing items to the Trakt List specified by `sync_to_trakt_list`.<br>**Values:** `ture` or `false` |
| `allowed_library_types` | **Description:** Used to specify the types of libraries that this definition can work with.<br>Multiple can be used for one definition as a list or comma separated string.<br>**Values:** `movie`, `show`, or `artist` |

@ -26,10 +26,9 @@ from modules.tautulli import Tautulli
from modules.tmdb import TMDb from modules.tmdb import TMDb
from modules.trakt import Trakt from modules.trakt import Trakt
from modules.tvdb import TVDb from modules.tvdb import TVDb
from modules.util import Failed, NotScheduled, NotScheduledRange from modules.util import Failed, NotScheduled, NotScheduledRange, YAML
from modules.webhooks import Webhooks from modules.webhooks import Webhooks
from retrying import retry from retrying import retry
from ruamel import yaml
logger = util.logger logger = util.logger
@ -85,120 +84,113 @@ class ConfigFile:
self.overlays_only = attrs["overlays_only"] if "overlays_only" in attrs else False self.overlays_only = attrs["overlays_only"] if "overlays_only" in attrs else False
current_time = datetime.now() current_time = datetime.now()
yaml.YAML().allow_duplicate_keys = True loaded_yaml = YAML(self.config_path)
try: self.data = loaded_yaml.data
new_config, _, _ = yaml.util.load_yaml_guess_indent(open(self.config_path, encoding="utf-8"))
def replace_attr(all_data, attr, par): def replace_attr(all_data, attr, par):
if "settings" not in all_data: if "settings" not in all_data:
all_data["settings"] = {} all_data["settings"] = {}
if par in all_data and all_data[par] and attr in all_data[par] and attr not in all_data["settings"]: if par in all_data and all_data[par] and attr in all_data[par] and attr not in all_data["settings"]:
all_data["settings"][attr] = all_data[par][attr] all_data["settings"][attr] = all_data[par][attr]
del all_data[par][attr] del all_data[par][attr]
if "libraries" not in new_config: if "libraries" not in self.data:
new_config["libraries"] = {} self.data["libraries"] = {}
if "settings" not in new_config: if "settings" not in self.data:
new_config["settings"] = {} self.data["settings"] = {}
if "tmdb" not in new_config: if "tmdb" not in self.data:
new_config["tmdb"] = {} self.data["tmdb"] = {}
replace_attr(new_config, "cache", "cache") replace_attr(self.data, "cache", "cache")
replace_attr(new_config, "cache_expiration", "cache") replace_attr(self.data, "cache_expiration", "cache")
if "config" in new_config: if "config" in self.data:
del new_config["cache"] del self.data["cache"]
replace_attr(new_config, "asset_directory", "plex") replace_attr(self.data, "asset_directory", "plex")
replace_attr(new_config, "sync_mode", "plex") replace_attr(self.data, "sync_mode", "plex")
replace_attr(new_config, "show_unmanaged", "plex") replace_attr(self.data, "show_unmanaged", "plex")
replace_attr(new_config, "show_filtered", "plex") replace_attr(self.data, "show_filtered", "plex")
replace_attr(new_config, "show_missing", "plex") replace_attr(self.data, "show_missing", "plex")
replace_attr(new_config, "save_missing", "plex") replace_attr(self.data, "save_missing", "plex")
if new_config["libraries"]: if self.data["libraries"]:
for library in new_config["libraries"]: for library in self.data["libraries"]:
if not new_config["libraries"][library]: if not self.data["libraries"][library]:
continue continue
if "radarr_add_all" in new_config["libraries"][library]: if "radarr_add_all" in self.data["libraries"][library]:
new_config["libraries"][library]["radarr_add_all_existing"] = new_config["libraries"][library].pop("radarr_add_all") self.data["libraries"][library]["radarr_add_all_existing"] = self.data["libraries"][library].pop("radarr_add_all")
if "sonarr_add_all" in new_config["libraries"][library]: if "sonarr_add_all" in self.data["libraries"][library]:
new_config["libraries"][library]["sonarr_add_all_existing"] = new_config["libraries"][library].pop("sonarr_add_all") self.data["libraries"][library]["sonarr_add_all_existing"] = self.data["libraries"][library].pop("sonarr_add_all")
if "plex" in new_config["libraries"][library] and new_config["libraries"][library]["plex"]: if "plex" in self.data["libraries"][library] and self.data["libraries"][library]["plex"]:
replace_attr(new_config["libraries"][library], "asset_directory", "plex") replace_attr(self.data["libraries"][library], "asset_directory", "plex")
replace_attr(new_config["libraries"][library], "sync_mode", "plex") replace_attr(self.data["libraries"][library], "sync_mode", "plex")
replace_attr(new_config["libraries"][library], "show_unmanaged", "plex") replace_attr(self.data["libraries"][library], "show_unmanaged", "plex")
replace_attr(new_config["libraries"][library], "show_filtered", "plex") replace_attr(self.data["libraries"][library], "show_filtered", "plex")
replace_attr(new_config["libraries"][library], "show_missing", "plex") replace_attr(self.data["libraries"][library], "show_missing", "plex")
replace_attr(new_config["libraries"][library], "save_missing", "plex") replace_attr(self.data["libraries"][library], "save_missing", "plex")
if "settings" in new_config["libraries"][library] and new_config["libraries"][library]["settings"]: if "settings" in self.data["libraries"][library] and self.data["libraries"][library]["settings"]:
if "collection_minimum" in new_config["libraries"][library]["settings"]: if "collection_minimum" in self.data["libraries"][library]["settings"]:
new_config["libraries"][library]["settings"]["minimum_items"] = new_config["libraries"][library]["settings"].pop("collection_minimum") self.data["libraries"][library]["settings"]["minimum_items"] = self.data["libraries"][library]["settings"].pop("collection_minimum")
if "radarr" in new_config["libraries"][library] and new_config["libraries"][library]["radarr"]: if "radarr" in self.data["libraries"][library] and self.data["libraries"][library]["radarr"]:
if "add" in new_config["libraries"][library]["radarr"]: if "add" in self.data["libraries"][library]["radarr"]:
new_config["libraries"][library]["radarr"]["add_missing"] = new_config["libraries"][library]["radarr"].pop("add") self.data["libraries"][library]["radarr"]["add_missing"] = self.data["libraries"][library]["radarr"].pop("add")
if "sonarr" in new_config["libraries"][library] and new_config["libraries"][library]["sonarr"]: if "sonarr" in self.data["libraries"][library] and self.data["libraries"][library]["sonarr"]:
if "add" in new_config["libraries"][library]["sonarr"]: if "add" in self.data["libraries"][library]["sonarr"]:
new_config["libraries"][library]["sonarr"]["add_missing"] = new_config["libraries"][library]["sonarr"].pop("add") self.data["libraries"][library]["sonarr"]["add_missing"] = self.data["libraries"][library]["sonarr"].pop("add")
if "operations" in new_config["libraries"][library] and new_config["libraries"][library]["operations"]: if "operations" in self.data["libraries"][library] and self.data["libraries"][library]["operations"]:
if "radarr_add_all" in new_config["libraries"][library]["operations"]: if "radarr_add_all" in self.data["libraries"][library]["operations"]:
new_config["libraries"][library]["operations"]["radarr_add_all_existing"] = new_config["libraries"][library]["operations"].pop("radarr_add_all") self.data["libraries"][library]["operations"]["radarr_add_all_existing"] = self.data["libraries"][library]["operations"].pop("radarr_add_all")
if "sonarr_add_all" in new_config["libraries"][library]["operations"]: if "sonarr_add_all" in self.data["libraries"][library]["operations"]:
new_config["libraries"][library]["operations"]["sonarr_add_all_existing"] = new_config["libraries"][library]["operations"].pop("sonarr_add_all") self.data["libraries"][library]["operations"]["sonarr_add_all_existing"] = self.data["libraries"][library]["operations"].pop("sonarr_add_all")
if "webhooks" in new_config["libraries"][library] and new_config["libraries"][library]["webhooks"] and "collection_changes" not in new_config["libraries"][library]["webhooks"]: if "webhooks" in self.data["libraries"][library] and self.data["libraries"][library]["webhooks"] and "collection_changes" not in self.data["libraries"][library]["webhooks"]:
changes = []
def hooks(attr):
if attr in new_config["libraries"][library]["webhooks"]:
changes.extend([w for w in util.get_list(new_config["libraries"][library]["webhooks"].pop(attr), split=False) if w not in changes])
hooks("collection_creation")
hooks("collection_addition")
hooks("collection_removal")
hooks("collection_changes")
new_config["libraries"][library]["webhooks"]["changes"] = None if not changes else changes if len(changes) > 1 else changes[0]
if "libraries" in new_config: new_config["libraries"] = new_config.pop("libraries")
if "playlist_files" in new_config: new_config["playlist_files"] = new_config.pop("playlist_files")
if "settings" in new_config:
temp = new_config.pop("settings")
if "collection_minimum" in temp:
temp["minimum_items"] = temp.pop("collection_minimum")
if "playlist_sync_to_user" in temp:
temp["playlist_sync_to_users"] = temp.pop("playlist_sync_to_user")
new_config["settings"] = temp
if "webhooks" in new_config:
temp = new_config.pop("webhooks")
if "changes" not in temp:
changes = [] changes = []
def hooks(attr): def hooks(attr):
if attr in temp: if attr in self.data["libraries"][library]["webhooks"]:
items = util.get_list(temp.pop(attr), split=False) changes.extend([w for w in util.get_list(self.data["libraries"][library]["webhooks"].pop(attr), split=False) if w not in changes])
if items:
changes.extend([w for w in items if w not in changes])
hooks("collection_creation") hooks("collection_creation")
hooks("collection_addition") hooks("collection_addition")
hooks("collection_removal") hooks("collection_removal")
hooks("collection_changes") hooks("collection_changes")
temp["changes"] = None if not changes else changes if len(changes) > 1 else changes[0] self.data["libraries"][library]["webhooks"]["changes"] = None if not changes else changes if len(changes) > 1 else changes[0]
new_config["webhooks"] = temp if "libraries" in self.data: self.data["libraries"] = self.data.pop("libraries")
if "plex" in new_config: new_config["plex"] = new_config.pop("plex") if "playlist_files" in self.data: self.data["playlist_files"] = self.data.pop("playlist_files")
if "tmdb" in new_config: new_config["tmdb"] = new_config.pop("tmdb") if "settings" in self.data:
if "tautulli" in new_config: new_config["tautulli"] = new_config.pop("tautulli") temp = self.data.pop("settings")
if "omdb" in new_config: new_config["omdb"] = new_config.pop("omdb") if "collection_minimum" in temp:
if "mdblist" in new_config: new_config["mdblist"] = new_config.pop("mdblist") temp["minimum_items"] = temp.pop("collection_minimum")
if "notifiarr" in new_config: new_config["notifiarr"] = new_config.pop("notifiarr") if "playlist_sync_to_user" in temp:
if "anidb" in new_config: new_config["anidb"] = new_config.pop("anidb") temp["playlist_sync_to_users"] = temp.pop("playlist_sync_to_user")
if "radarr" in new_config: self.data["settings"] = temp
temp = new_config.pop("radarr") if "webhooks" in self.data:
if temp and "add" in temp: temp = self.data.pop("webhooks")
temp["add_missing"] = temp.pop("add") if "changes" not in temp:
new_config["radarr"] = temp changes = []
if "sonarr" in new_config: def hooks(attr):
temp = new_config.pop("sonarr") if attr in temp:
if temp and "add" in temp: items = util.get_list(temp.pop(attr), split=False)
temp["add_missing"] = temp.pop("add") if items:
new_config["sonarr"] = temp changes.extend([w for w in items if w not in changes])
if "trakt" in new_config: new_config["trakt"] = new_config.pop("trakt") hooks("collection_creation")
if "mal" in new_config: new_config["mal"] = new_config.pop("mal") hooks("collection_addition")
self.data = new_config hooks("collection_removal")
except yaml.scanner.ScannerError as e: hooks("collection_changes")
logger.stacktrace() temp["changes"] = None if not changes else changes if len(changes) > 1 else changes[0]
raise Failed(f"YAML Error: {util.tab_new_lines(e)}") self.data["webhooks"] = temp
except Exception as e: if "plex" in self.data: self.data["plex"] = self.data.pop("plex")
logger.stacktrace() if "tmdb" in self.data: self.data["tmdb"] = self.data.pop("tmdb")
raise Failed(f"YAML Error: {e}") if "tautulli" in self.data: self.data["tautulli"] = self.data.pop("tautulli")
if "omdb" in self.data: self.data["omdb"] = self.data.pop("omdb")
if "mdblist" in self.data: self.data["mdblist"] = self.data.pop("mdblist")
if "notifiarr" in self.data: self.data["notifiarr"] = self.data.pop("notifiarr")
if "anidb" in self.data: self.data["anidb"] = self.data.pop("anidb")
if "radarr" in self.data:
temp = self.data.pop("radarr")
if temp and "add" in temp:
temp["add_missing"] = temp.pop("add")
self.data["radarr"] = temp
if "sonarr" in self.data:
temp = self.data.pop("sonarr")
if temp and "add" in temp:
temp["add_missing"] = temp.pop("add")
self.data["sonarr"] = temp
if "trakt" in self.data: self.data["trakt"] = self.data.pop("trakt")
if "mal" in self.data: self.data["mal"] = self.data.pop("mal")
def check_for_attribute(data, attribute, parent=None, test_list=None, default=None, do_print=True, default_is_none=False, req_default=False, var_type="str", throw=False, save=True): def check_for_attribute(data, attribute, parent=None, test_list=None, default=None, do_print=True, default_is_none=False, req_default=False, var_type="str", throw=False, save=True):
endline = "" endline = ""
@ -215,12 +207,12 @@ class ConfigFile:
if data is None or attribute not in data: if data is None or attribute not in data:
message = f"{text} not found" message = f"{text} not found"
if parent and save is True: if parent and save is True:
loaded_config, _, _ = yaml.util.load_yaml_guess_indent(open(self.config_path, encoding="utf-8")) yaml = YAML(self.config_path)
endline = f"\n{parent} sub-attribute {attribute} added to config" endline = f"\n{parent} sub-attribute {attribute} added to config"
if parent not in loaded_config or not loaded_config[parent]: loaded_config[parent] = {attribute: default} if parent not in yaml.data or not yaml.data[parent]: yaml.data[parent] = {attribute: default}
elif attribute not in loaded_config[parent]: loaded_config[parent][attribute] = default elif attribute not in yaml.data[parent]: yaml.data[parent][attribute] = default
else: endline = "" else: endline = ""
yaml.round_trip_dump(loaded_config, open(self.config_path, "w"), block_seq_indent=2) yaml.save()
if default_is_none and var_type in ["list", "int_list", "comma_list"]: return default if default else [] if default_is_none and var_type in ["list", "int_list", "comma_list"]: return default if default else []
elif data[attribute] is None: elif data[attribute] is None:
if default_is_none and var_type in ["list", "int_list", "comma_list"]: return default if default else [] if default_is_none and var_type in ["list", "int_list", "comma_list"]: return default if default else []

@ -3,8 +3,7 @@ from abc import ABC, abstractmethod
from modules import util from modules import util
from modules.meta import MetadataFile, OverlayFile from modules.meta import MetadataFile, OverlayFile
from modules.operations import Operations from modules.operations import Operations
from modules.util import Failed from modules.util import Failed, YAML
from ruamel import yaml
logger = util.logger logger = util.logger
@ -258,10 +257,9 @@ class Library(ABC):
self.report_data[collection][other].append(title) self.report_data[collection][other].append(title)
with open(self.report_path, "w"): pass with open(self.report_path, "w"): pass
try: yaml = YAML(self.report_path)
yaml.round_trip_dump(self.report_data, open(self.report_path, "w", encoding="utf-8")) yaml.data = self.report_data
except yaml.scanner.ScannerError as e: yaml.save()
logger.error(f"YAML Error: {util.tab_new_lines(e)}")
def cache_items(self): def cache_items(self):
logger.info("") logger.info("")

@ -1,7 +1,6 @@
import math, re, secrets, time, webbrowser import re, secrets, time, webbrowser
from modules import util from modules import util
from modules.util import Failed, TimeoutExpired from modules.util import Failed, TimeoutExpired, YAML
from ruamel import yaml
logger = util.logger logger = util.logger
@ -138,16 +137,15 @@ class MyAnimeList:
def _save(self, authorization): def _save(self, authorization):
if authorization is not None and "access_token" in authorization and authorization["access_token"] and self._check(authorization): if authorization is not None and "access_token" in authorization and authorization["access_token"] and self._check(authorization):
if self.authorization != authorization and not self.config.read_only: if self.authorization != authorization and not self.config.read_only:
yaml.YAML().allow_duplicate_keys = True yaml = YAML(self.config_path)
config, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.config_path, encoding="utf-8")) yaml.data["mal"]["authorization"] = {
config["mal"]["authorization"] = {
"access_token": authorization["access_token"], "access_token": authorization["access_token"],
"token_type": authorization["token_type"], "token_type": authorization["token_type"],
"expires_in": authorization["expires_in"], "expires_in": authorization["expires_in"],
"refresh_token": authorization["refresh_token"] "refresh_token": authorization["refresh_token"]
} }
logger.info(f"Saving authorization information to {self.config_path}") logger.info(f"Saving authorization information to {self.config_path}")
yaml.round_trip_dump(config, open(self.config_path, "w"), indent=ind, block_seq_indent=bsi) yaml.save()
logger.secret(authorization["access_token"]) logger.secret(authorization["access_token"])
self.authorization = authorization self.authorization = authorization
return True return True

@ -1,9 +1,8 @@
import math, operator, os, re, requests import math, operator, os, re, requests
from datetime import datetime from datetime import datetime
from modules import plex, ergast, util from modules import plex, ergast, util
from modules.util import Failed from modules.util import Failed, YAML
from plexapi.exceptions import NotFound, BadRequest from plexapi.exceptions import NotFound, BadRequest
from ruamel import yaml
logger = util.logger logger = util.logger
@ -81,28 +80,19 @@ class DataFile:
return data return data
def load_file(self, file_type, file_path): def load_file(self, file_type, file_path):
try: if file_type in ["URL", "Git", "Repo"]:
if file_type in ["URL", "Git", "Repo"]: if file_type == "Repo" and not self.config.custom_repo:
if file_type == "Repo" and not self.config.custom_repo: raise Failed("Config Error: No custom_repo defined")
raise Failed("Config Error: No custom_repo defined") content_path = file_path if file_type == "URL" else f"{self.config.custom_repo if file_type == 'Repo' else util.github_base}{file_path}.yml"
content_path = file_path if file_type == "URL" else f"{self.config.custom_repo if file_type == 'Repo' else util.github_base}{file_path}.yml" response = self.config.get(content_path)
response = self.config.get(content_path) if response.status_code >= 400:
if response.status_code >= 400: raise Failed(f"URL Error: No file found at {content_path}")
raise Failed(f"URL Error: No file found at {content_path}") yaml = YAML(input_data=response.content, check_empty=True)
content = response.content elif os.path.exists(os.path.abspath(file_path)):
elif os.path.exists(os.path.abspath(file_path)): yaml = YAML(path=os.path.abspath(file_path), check_empty=True)
content = open(file_path, encoding="utf-8") else:
else: raise Failed(f"File Error: File does not exist {os.path.abspath(file_path)}")
raise Failed(f"File Error: File does not exist {os.path.abspath(file_path)}") return yaml.data
data, _, _ = yaml.util.load_yaml_guess_indent(content)
except yaml.scanner.ScannerError as ye:
raise Failed(f"YAML Error: {util.tab_new_lines(ye)}")
except Exception as e:
logger.stacktrace()
raise Failed(f"YAML Error: {e}")
if not data or not isinstance(data, dict):
raise Failed("YAML Error: File is empty")
return data
def apply_template(self, name, data, template_call): def apply_template(self, name, data, template_call):
if not self.templates: if not self.templates:

@ -1,10 +1,9 @@
import os, re import os, re
from datetime import datetime from datetime import datetime
from modules import plex, util from modules import plex, util
from modules.util import Failed from modules.util import Failed, YAML
from plexapi.audio import Artist from plexapi.audio import Artist
from plexapi.video import Show from plexapi.video import Show
from ruamel import yaml
logger = util.logger logger = util.logger
@ -455,24 +454,24 @@ class Operations:
logger.info("") logger.info("")
logger.info(f"Metadata Backup Path: {self.library.metadata_backup['path']}") logger.info(f"Metadata Backup Path: {self.library.metadata_backup['path']}")
logger.info("") logger.info("")
meta = None yaml = None
if os.path.exists(self.library.metadata_backup["path"]): if os.path.exists(self.library.metadata_backup["path"]):
try: try:
meta, _, _ = yaml.util.load_yaml_guess_indent(open(self.library.metadata_backup["path"], encoding="utf-8")) yaml = YAML(path=self.library.metadata_backup["path"])
except yaml.scanner.ScannerError as e: except Failed as e:
logger.error(f"YAML Error: {util.tab_new_lines(e)}") logger.error(e)
filename, file_extension = os.path.splitext(self.library.metadata_backup["path"]) filename, file_extension = os.path.splitext(self.library.metadata_backup["path"])
i = 1 i = 1
while os.path.exists(f"{filename}{i}{file_extension}"): while os.path.exists(f"{filename}{i}{file_extension}"):
i += 1 i += 1
os.rename(self.library.metadata_backup["path"], f"{filename}{i}{file_extension}") os.rename(self.library.metadata_backup["path"], f"{filename}{i}{file_extension}")
logger.error(f"Backup failed to load saving copy to {filename}{i}{file_extension}") logger.error(f"Backup failed to load saving copy to {filename}{i}{file_extension}")
if not meta: if not yaml:
meta = {} yaml = YAML(path=self.library.metadata_backup["path"], create=True)
if "metadata" not in meta: if "metadata" not in yaml.data:
meta["metadata"] = {} yaml.data["metadata"] = {}
special_names = {} special_names = {}
for mk, mv in meta["metadata"].items(): for mk, mv in yaml.data["metadata"].items():
if "title" in mv: if "title" in mv:
special_names[mv["title"]] = mk special_names[mv["title"]] = mk
if "year" in mv: if "year" in mv:
@ -484,7 +483,7 @@ class Operations:
map_key, attrs = self.library.get_locked_attributes(item, titles) map_key, attrs = self.library.get_locked_attributes(item, titles)
if map_key in special_names: if map_key in special_names:
map_key = special_names[map_key] map_key = special_names[map_key]
og_dict = meta["metadata"][map_key] if map_key in meta["metadata"] and meta["metadata"][map_key] else {} og_dict = yaml.data["metadata"][map_key] if map_key in yaml.data["metadata"] and yaml.data["metadata"][map_key] else {}
if attrs or (self.library.metadata_backup["add_blank_entries"] and not og_dict): if attrs or (self.library.metadata_backup["add_blank_entries"] and not og_dict):
def get_dict(attrs_dict): def get_dict(attrs_dict):
return {ak: get_dict(av) if isinstance(av, dict) else av for ak, av in attrs_dict.items()} return {ak: get_dict(av) if isinstance(av, dict) else av for ak, av in attrs_dict.items()}
@ -494,13 +493,10 @@ class Operations:
for lk, lv in looping.items(): for lk, lv in looping.items():
dest_dict[lk] = loop_dict(lv, dest_dict[lk] if lk in dest_dict and dest_dict[lk] else {}) if isinstance(lv, dict) else lv dest_dict[lk] = loop_dict(lv, dest_dict[lk] if lk in dest_dict and dest_dict[lk] else {}) if isinstance(lv, dict) else lv
return dest_dict return dest_dict
meta["metadata"][map_key] = loop_dict(get_dict(attrs), og_dict) yaml.data["metadata"][map_key] = loop_dict(get_dict(attrs), og_dict)
logger.exorcise() logger.exorcise()
try: yaml.save()
yaml.round_trip_dump(meta, open(self.library.metadata_backup["path"], "w", encoding="utf-8"), block_seq_indent=2) logger.info(f"{len(yaml.data['metadata'])} {self.library.type.capitalize()}{'s' if len(yaml.data['metadata']) > 1 else ''} Backed Up")
logger.info(f"{len(meta['metadata'])} {self.library.type.capitalize()}{'s' if len(meta['metadata']) > 1 else ''} Backed Up")
except yaml.scanner.ScannerError as e:
logger.error(f"YAML Error: {util.tab_new_lines(e)}")
operation_run_time = str(datetime.now() - operation_start).split('.')[0] operation_run_time = str(datetime.now() - operation_start).split('.')[0]
logger.info("") logger.info("")

@ -1,8 +1,7 @@
import requests, time, webbrowser import requests, time, webbrowser
from modules import util from modules import util
from modules.util import Failed, TimeoutExpired from modules.util import Failed, TimeoutExpired, YAML
from retrying import retry from retrying import retry
from ruamel import yaml
logger = util.logger logger = util.logger
@ -171,10 +170,9 @@ class Trakt:
def _save(self, authorization): def _save(self, authorization):
if authorization and self._check(authorization): if authorization and self._check(authorization):
if self.authorization != authorization and not self.config.read_only: if self.authorization != authorization and not self.config.read_only:
yaml.YAML().allow_duplicate_keys = True yaml = YAML(self.config_path)
config, ind, bsi = yaml.util.load_yaml_guess_indent(open(self.config_path, encoding="utf-8")) yaml.data["trakt"]["pin"] = None
config["trakt"]["pin"] = None yaml.data["trakt"]["authorization"] = {
config["trakt"]["authorization"] = {
"access_token": authorization["access_token"], "access_token": authorization["access_token"],
"token_type": authorization["token_type"], "token_type": authorization["token_type"],
"expires_in": authorization["expires_in"], "expires_in": authorization["expires_in"],
@ -183,7 +181,7 @@ class Trakt:
"created_at": authorization["created_at"] "created_at": authorization["created_at"]
} }
logger.info(f"Saving authorization information to {self.config_path}") logger.info(f"Saving authorization information to {self.config_path}")
yaml.round_trip_dump(config, open(self.config_path, "w"), indent=ind, block_seq_indent=bsi) yaml.save()
self.authorization = authorization self.authorization = authorization
return True return True
return False return False

@ -1,4 +1,4 @@
import glob, logging, os, re, requests, signal, sys, time import glob, logging, os, re, requests, ruamel.yaml, signal, sys, time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathvalidate import is_valid_filename, sanitize_filename from pathvalidate import is_valid_filename, sanitize_filename
from plexapi.audio import Album, Track from plexapi.audio import Album, Track
@ -89,9 +89,6 @@ github_base = "https://raw.githubusercontent.com/meisnate12/Plex-Meta-Manager-Co
previous_time = None previous_time = None
start_time = None start_time = None
def tab_new_lines(data):
return str(data).replace("\n", "\n ") if "\n" in str(data) else str(data)
def make_ordinal(n): def make_ordinal(n):
return f"{n}{'th' if 11 <= (n % 100) <= 13 else ['th', 'st', 'nd', 'rd', 'th'][min(n % 10, 4)]}" return f"{n}{'th' if 11 <= (n % 100) <= 13 else ['th', 'st', 'nd', 'rd', 'th'][min(n % 10, 4)]}"
@ -778,3 +775,35 @@ def check_time(message, end=False):
else: else:
logger.debug(f"{message}: {current_time - previous_time}") logger.debug(f"{message}: {current_time - previous_time}")
previous_time = None if end else current_time previous_time = None if end else current_time
class YAML:
def __init__(self, path=None, input_data=None, check_empty=False, create=False):
self.path = path
self.input_data = input_data
self.yaml = ruamel.yaml.YAML()
self.yaml.indent(mapping=2, sequence=2)
try:
if input_data:
self.data = self.yaml.load(input_data)
else:
if create and not os.path.exists(self.path):
with open(self.path, 'w'):
pass
self.data = {}
else:
with open(self.path, encoding="utf-8") as fp:
self.data = self.yaml.load(fp)
except ruamel.yaml.error.YAMLError as e:
e = str(e).replace("\n", "\n ")
raise Failed(f"YAML Error: {e}")
except Exception as e:
raise Failed(f"YAML Error: {e}")
if not self.data or not isinstance(self.data, dict):
if check_empty:
raise Failed("YAML Error: File is empty")
self.data = {}
def save(self):
if self.path:
with open(self.path, 'w') as fp:
self.yaml.dump(self.data, fp)

@ -6,7 +6,6 @@ try:
from modules.logs import MyLogger from modules.logs import MyLogger
from plexapi.exceptions import NotFound from plexapi.exceptions import NotFound
from plexapi.video import Show, Season from plexapi.video import Show, Season
from ruamel import yaml
except ModuleNotFoundError: except ModuleNotFoundError:
print("Requirements Error: Requirements are not installed") print("Requirements Error: Requirements are not installed")
sys.exit(0) sys.exit(0)

Loading…
Cancel
Save