Backups and Snapshots - Data directory now fully portable, (all paths are relative) , refactored backup zip export creation

playwright-conditional-install
dgtlmoon 2 years ago committed by GitHub
parent 724cb17224
commit 72834a42fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -987,9 +987,6 @@ def changedetection_app(config=None, datastore_o=None):
# create a ZipFile object # create a ZipFile object
backupname = "changedetection-backup-{}.zip".format(int(time.time())) backupname = "changedetection-backup-{}.zip".format(int(time.time()))
# We only care about UUIDS from the current index file
uuids = list(datastore.data['watching'].keys())
backup_filepath = os.path.join(datastore_o.datastore_path, backupname) backup_filepath = os.path.join(datastore_o.datastore_path, backupname)
with zipfile.ZipFile(backup_filepath, "w", with zipfile.ZipFile(backup_filepath, "w",
@ -1005,12 +1002,12 @@ def changedetection_app(config=None, datastore_o=None):
# Add the flask app secret # Add the flask app secret
zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt") zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt")
# Add any snapshot data we find, use the full path to access the file, but make the file 'relative' in the Zip. # Add any data in the watch data directory.
for txt_file_path in Path(datastore_o.datastore_path).rglob('*.txt'): for uuid, w in datastore.data['watching'].items():
parent_p = txt_file_path.parent for f in Path(w.watch_data_dir).glob('*'):
if parent_p.name in uuids: zipObj.write(f,
zipObj.write(txt_file_path, # Use the full path to access the file, but make the file 'relative' in the Zip.
arcname=str(txt_file_path).replace(datastore_o.datastore_path, ''), arcname=os.path.join(f.parts[-2], f.parts[-1]),
compress_type=zipfile.ZIP_DEFLATED, compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8) compresslevel=8)

Binary file not shown.

@ -1,6 +1,8 @@
import os
import uuid as uuid_builder
from distutils.util import strtobool from distutils.util import strtobool
import logging
import os
import time
import uuid
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 60)) minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 60))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
@ -22,7 +24,7 @@ class model(dict):
#'newest_history_key': 0, #'newest_history_key': 0,
'title': None, 'title': None,
'previous_md5': False, 'previous_md5': False,
'uuid': str(uuid_builder.uuid4()), 'uuid': str(uuid.uuid4()),
'headers': {}, # Extra headers to send 'headers': {}, # Extra headers to send
'body': None, 'body': None,
'method': 'GET', 'method': 'GET',
@ -60,7 +62,7 @@ class model(dict):
self.update(self.__base_config) self.update(self.__base_config)
self.__datastore_path = kw['datastore_path'] self.__datastore_path = kw['datastore_path']
self['uuid'] = str(uuid_builder.uuid4()) self['uuid'] = str(uuid.uuid4())
del kw['datastore_path'] del kw['datastore_path']
@ -82,10 +84,9 @@ class model(dict):
return False return False
def ensure_data_dir_exists(self): def ensure_data_dir_exists(self):
target_path = os.path.join(self.__datastore_path, self['uuid']) if not os.path.isdir(self.watch_data_dir):
if not os.path.isdir(target_path): print ("> Creating data dir {}".format(self.watch_data_dir))
print ("> Creating data dir {}".format(target_path)) os.mkdir(self.watch_data_dir)
os.mkdir(target_path)
@property @property
def label(self): def label(self):
@ -109,18 +110,39 @@ class model(dict):
@property @property
def history(self): def history(self):
"""History index is just a text file as a list
{watch-uuid}/history.txt
contains a list like
{epoch-time},{filename}\n
We read in this list as the history information
"""
tmp_history = {} tmp_history = {}
import logging
import time
# Read the history file as a dict # Read the history file as a dict
fname = os.path.join(self.__datastore_path, self.get('uuid'), "history.txt") fname = os.path.join(self.watch_data_dir, "history.txt")
if os.path.isfile(fname): if os.path.isfile(fname):
logging.debug("Reading history index " + str(time.time())) logging.debug("Reading history index " + str(time.time()))
with open(fname, "r") as f: with open(fname, "r") as f:
for i in f.readlines(): for i in f.readlines():
if ',' in i: if ',' in i:
k, v = i.strip().split(',', 2) k, v = i.strip().split(',', 2)
# The index history could contain a relative path, so we need to make the fullpath
# so that python can read it
if not '/' in v and not '\'' in v:
v = os.path.join(self.watch_data_dir, v)
else:
# It's possible that they moved the datadir on older versions
# So the snapshot exists but is in a different path
snapshot_fname = v.split('/')[-1]
proposed_new_path = os.path.join(self.watch_data_dir, snapshot_fname)
if not os.path.exists(v) and os.path.exists(proposed_new_path):
v = proposed_new_path
tmp_history[k] = v tmp_history[k] = v
if len(tmp_history): if len(tmp_history):
@ -132,7 +154,7 @@ class model(dict):
@property @property
def has_history(self): def has_history(self):
fname = os.path.join(self.__datastore_path, self.get('uuid'), "history.txt") fname = os.path.join(self.watch_data_dir, "history.txt")
return os.path.isfile(fname) return os.path.isfile(fname)
# Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0. # Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0.
@ -151,25 +173,19 @@ class model(dict):
# Save some text file to the appropriate path and bump the history # Save some text file to the appropriate path and bump the history
# result_obj from fetch_site_status.run() # result_obj from fetch_site_status.run()
def save_history_text(self, contents, timestamp): def save_history_text(self, contents, timestamp):
import uuid
import logging
output_path = os.path.join(self.__datastore_path, self['uuid'])
self.ensure_data_dir_exists() self.ensure_data_dir_exists()
snapshot_fname = os.path.join(output_path, str(uuid.uuid4())) snapshot_fname = "{}.txt".format(str(uuid.uuid4()))
logging.debug("Saving history text {}".format(snapshot_fname))
# in /diff/ and /preview/ we are going to assume for now that it's UTF-8 when reading # in /diff/ and /preview/ we are going to assume for now that it's UTF-8 when reading
# most sites are utf-8 and some are even broken utf-8 # most sites are utf-8 and some are even broken utf-8
with open(snapshot_fname, 'wb') as f: with open(os.path.join(self.watch_data_dir, snapshot_fname), 'wb') as f:
f.write(contents) f.write(contents)
f.close() f.close()
# Append to index # Append to index
# @todo check last char was \n # @todo check last char was \n
index_fname = os.path.join(output_path, "history.txt") index_fname = os.path.join(self.watch_data_dir, "history.txt")
with open(index_fname, 'a') as f: with open(index_fname, 'a') as f:
f.write("{},{}\n".format(timestamp, snapshot_fname)) f.write("{},{}\n".format(timestamp, snapshot_fname))
f.close() f.close()
@ -210,14 +226,14 @@ class model(dict):
return not local_lines.issubset(existing_history) return not local_lines.issubset(existing_history)
def get_screenshot(self): def get_screenshot(self):
fname = os.path.join(self.__datastore_path, self['uuid'], "last-screenshot.png") fname = os.path.join(self.watch_data_dir, "last-screenshot.png")
if os.path.isfile(fname): if os.path.isfile(fname):
return fname return fname
return False return False
def __get_file_ctime(self, filename): def __get_file_ctime(self, filename):
fname = os.path.join(self.__datastore_path, self['uuid'], filename) fname = os.path.join(self.watch_data_dir, filename)
if os.path.isfile(fname): if os.path.isfile(fname):
return int(os.path.getmtime(fname)) return int(os.path.getmtime(fname))
return False return False
@ -242,9 +258,14 @@ class model(dict):
def snapshot_error_screenshot_ctime(self): def snapshot_error_screenshot_ctime(self):
return self.__get_file_ctime('last-error-screenshot.png') return self.__get_file_ctime('last-error-screenshot.png')
@property
def watch_data_dir(self):
# The base dir of the watch data
return os.path.join(self.__datastore_path, self['uuid'])
def get_error_text(self): def get_error_text(self):
"""Return the text saved from a previous request that resulted in a non-200 error""" """Return the text saved from a previous request that resulted in a non-200 error"""
fname = os.path.join(self.__datastore_path, self['uuid'], "last-error.txt") fname = os.path.join(self.watch_data_dir, "last-error.txt")
if os.path.isfile(fname): if os.path.isfile(fname):
with open(fname, 'r') as f: with open(fname, 'r') as f:
return f.read() return f.read()
@ -252,7 +273,7 @@ class model(dict):
def get_error_snapshot(self): def get_error_snapshot(self):
"""Return path to the screenshot that resulted in a non-200 error""" """Return path to the screenshot that resulted in a non-200 error"""
fname = os.path.join(self.__datastore_path, self['uuid'], "last-error-screenshot.png") fname = os.path.join(self.watch_data_dir, "last-error-screenshot.png")
if os.path.isfile(fname): if os.path.isfile(fname):
return fname return fname
return False return False

@ -1,18 +1,31 @@
#!/usr/bin/python3 #!/usr/bin/python3
import time from .util import set_original_response, set_modified_response, live_server_setup
from flask import url_for from flask import url_for
from urllib.request import urlopen from urllib.request import urlopen
from . util import set_original_response, set_modified_response, live_server_setup from zipfile import ZipFile
import re
import time
def test_backup(client, live_server): def test_backup(client, live_server):
live_server_setup(live_server) live_server_setup(live_server)
set_original_response()
# Give the endpoint time to spin up # Give the endpoint time to spin up
time.sleep(1) time.sleep(1)
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(3)
res = client.get( res = client.get(
url_for("get_backup"), url_for("get_backup"),
follow_redirects=True follow_redirects=True
@ -20,6 +33,19 @@ def test_backup(client, live_server):
# Should get the right zip content type # Should get the right zip content type
assert res.content_type == "application/zip" assert res.content_type == "application/zip"
# Should be PK/ZIP stream # Should be PK/ZIP stream
assert res.data.count(b'PK') >= 2 assert res.data.count(b'PK') >= 2
# ZipFile from buffer seems non-obvious, just save it instead
with open("download.zip", 'wb') as f:
f.write(res.data)
zip = ZipFile('download.zip')
l = zip.namelist()
uuid4hex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
newlist = list(filter(uuid4hex.match, l)) # Read Note below
# Should be two txt files in the archive (history and the snapshot)
assert len(newlist) == 2

Loading…
Cancel
Save