Merge branch 'diff-filters' of https://github.com/bwees/changedetection.io into diff-filters

diff-proposed-for-bwees
dgtlmoon 2 years ago
commit 3e7fd9570a

@ -987,9 +987,6 @@ def changedetection_app(config=None, datastore_o=None):
# create a ZipFile object # create a ZipFile object
backupname = "changedetection-backup-{}.zip".format(int(time.time())) backupname = "changedetection-backup-{}.zip".format(int(time.time()))
# We only care about UUIDS from the current index file
uuids = list(datastore.data['watching'].keys())
backup_filepath = os.path.join(datastore_o.datastore_path, backupname) backup_filepath = os.path.join(datastore_o.datastore_path, backupname)
with zipfile.ZipFile(backup_filepath, "w", with zipfile.ZipFile(backup_filepath, "w",
@ -1005,12 +1002,12 @@ def changedetection_app(config=None, datastore_o=None):
# Add the flask app secret # Add the flask app secret
zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt") zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt")
# Add any snapshot data we find, use the full path to access the file, but make the file 'relative' in the Zip. # Add any data in the watch data directory.
for txt_file_path in Path(datastore_o.datastore_path).rglob('*.txt'): for uuid, w in datastore.data['watching'].items():
parent_p = txt_file_path.parent for f in Path(w.watch_data_dir).glob('*'):
if parent_p.name in uuids: zipObj.write(f,
zipObj.write(txt_file_path, # Use the full path to access the file, but make the file 'relative' in the Zip.
arcname=str(txt_file_path).replace(datastore_o.datastore_path, ''), arcname=os.path.join(f.parts[-2], f.parts[-1]),
compress_type=zipfile.ZIP_DEFLATED, compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8) compresslevel=8)

Binary file not shown.

@ -2,15 +2,15 @@ import hashlib
import logging import logging
import os import os
import re import re
import time
import urllib3 import urllib3
import difflib import difflib
import requests
import json
from changedetectionio import content_fetcher, html_tools from changedetectionio import content_fetcher, html_tools
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
# Some common stuff here that can be moved to a base class # Some common stuff here that can be moved to a base class
# (set_proxy_from_list) # (set_proxy_from_list)
class perform_site_check(): class perform_site_check():
@ -36,6 +36,8 @@ class perform_site_check():
def run(self, uuid): def run(self, uuid):
from jinja2 import Environment
changed_detected = False changed_detected = False
screenshot = False # as bytes screenshot = False # as bytes
stripped_text_from_html = "" stripped_text_from_html = ""
@ -57,6 +59,19 @@ class perform_site_check():
# Tweak the base config with the per-watch ones # Tweak the base config with the per-watch ones
request_headers = self.datastore.data['settings']['headers'].copy() request_headers = self.datastore.data['settings']['headers'].copy()
if self.datastore.data['watching'][uuid].get('external_header_server') is not None and self.datastore.data['watching'][uuid].get('external_header_server') != "" and self.datastore.data['watching'][uuid].get('external_header_server') != "None":
try:
resp = requests.get(self.datastore.data['watching'][uuid].get('external_header_server'))
if resp.status_code != 200:
raise Exception("External header server returned non-200 response. Please check the URL for the server")
data = json.loads(resp.text.strip())
request_headers.update(resp.json())
except json.decoder.JSONDecodeError:
raise Exception("Failed to decode JSON response from external header server")
request_headers.update(extra_headers) request_headers.update(extra_headers)
# https://github.com/psf/requests/issues/4525 # https://github.com/psf/requests/issues/4525
@ -66,7 +81,11 @@ class perform_site_check():
request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '') request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '')
timeout = self.datastore.data['settings']['requests'].get('timeout') timeout = self.datastore.data['settings']['requests'].get('timeout')
url = watch.get('url')
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
url = str(jinja2_env.from_string(watch.get('url')).render())
request_body = self.datastore.data['watching'][uuid].get('body') request_body = self.datastore.data['watching'][uuid].get('body')
request_method = self.datastore.data['watching'][uuid].get('method') request_method = self.datastore.data['watching'][uuid].get('method')
ignore_status_codes = self.datastore.data['watching'][uuid].get('ignore_status_codes', False) ignore_status_codes = self.datastore.data['watching'][uuid].get('ignore_status_codes', False)

@ -370,6 +370,7 @@ class watchForm(commonSettingsForm):
title = StringField('Title', default='') title = StringField('Title', default='')
ignore_text = StringListField('Ignore text', [ValidateListRegex()]) ignore_text = StringListField('Ignore text', [ValidateListRegex()])
external_header_server = fields.URLField('External Header Server', validators=[validators.Optional(), validateURL()])
headers = StringDictKeyValue('Request headers') headers = StringDictKeyValue('Request headers')
body = TextAreaField('Request body', [validators.Optional()]) body = TextAreaField('Request body', [validators.Optional()])
method = SelectField('Request method', choices=valid_method, default=default_method) method = SelectField('Request method', choices=valid_method, default=default_method)

@ -1,6 +1,8 @@
import os
import uuid as uuid_builder
from distutils.util import strtobool from distutils.util import strtobool
import logging
import os
import time
import uuid
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 60)) minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 60))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
@ -22,8 +24,9 @@ class model(dict):
#'newest_history_key': 0, #'newest_history_key': 0,
'title': None, 'title': None,
'previous_md5': False, 'previous_md5': False,
'uuid': str(uuid_builder.uuid4()), 'uuid': str(uuid.uuid4()),
'headers': {}, # Extra headers to send 'headers': {}, # Extra headers to send
'external_header_server': None, # URL to a server that will return headers
'body': None, 'body': None,
'method': 'GET', 'method': 'GET',
#'history': {}, # Dict of timestamp and output stripped filename #'history': {}, # Dict of timestamp and output stripped filename
@ -62,7 +65,7 @@ class model(dict):
self.update(self.__base_config) self.update(self.__base_config)
self.__datastore_path = kw['datastore_path'] self.__datastore_path = kw['datastore_path']
self['uuid'] = str(uuid_builder.uuid4()) self['uuid'] = str(uuid.uuid4())
del kw['datastore_path'] del kw['datastore_path']
@ -84,10 +87,9 @@ class model(dict):
return False return False
def ensure_data_dir_exists(self): def ensure_data_dir_exists(self):
target_path = os.path.join(self.__datastore_path, self['uuid']) if not os.path.isdir(self.watch_data_dir):
if not os.path.isdir(target_path): print ("> Creating data dir {}".format(self.watch_data_dir))
print ("> Creating data dir {}".format(target_path)) os.mkdir(self.watch_data_dir)
os.mkdir(target_path)
@property @property
def label(self): def label(self):
@ -111,18 +113,39 @@ class model(dict):
@property @property
def history(self): def history(self):
"""History index is just a text file as a list
{watch-uuid}/history.txt
contains a list like
{epoch-time},{filename}\n
We read in this list as the history information
"""
tmp_history = {} tmp_history = {}
import logging
import time
# Read the history file as a dict # Read the history file as a dict
fname = os.path.join(self.__datastore_path, self.get('uuid'), "history.txt") fname = os.path.join(self.watch_data_dir, "history.txt")
if os.path.isfile(fname): if os.path.isfile(fname):
logging.debug("Reading history index " + str(time.time())) logging.debug("Reading history index " + str(time.time()))
with open(fname, "r") as f: with open(fname, "r") as f:
for i in f.readlines(): for i in f.readlines():
if ',' in i: if ',' in i:
k, v = i.strip().split(',', 2) k, v = i.strip().split(',', 2)
# The index history could contain a relative path, so we need to make the fullpath
# so that python can read it
if not '/' in v and not '\'' in v:
v = os.path.join(self.watch_data_dir, v)
else:
# It's possible that they moved the datadir on older versions
# So the snapshot exists but is in a different path
snapshot_fname = v.split('/')[-1]
proposed_new_path = os.path.join(self.watch_data_dir, snapshot_fname)
if not os.path.exists(v) and os.path.exists(proposed_new_path):
v = proposed_new_path
tmp_history[k] = v tmp_history[k] = v
if len(tmp_history): if len(tmp_history):
@ -134,7 +157,7 @@ class model(dict):
@property @property
def has_history(self): def has_history(self):
fname = os.path.join(self.__datastore_path, self.get('uuid'), "history.txt") fname = os.path.join(self.watch_data_dir, "history.txt")
return os.path.isfile(fname) return os.path.isfile(fname)
# Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0. # Returns the newest key, but if theres only 1 record, then it's counted as not being new, so return 0.
@ -153,25 +176,19 @@ class model(dict):
# Save some text file to the appropriate path and bump the history # Save some text file to the appropriate path and bump the history
# result_obj from fetch_site_status.run() # result_obj from fetch_site_status.run()
def save_history_text(self, contents, timestamp): def save_history_text(self, contents, timestamp):
import uuid
import logging
output_path = os.path.join(self.__datastore_path, self['uuid'])
self.ensure_data_dir_exists() self.ensure_data_dir_exists()
snapshot_fname = os.path.join(output_path, str(uuid.uuid4())) snapshot_fname = "{}.txt".format(str(uuid.uuid4()))
logging.debug("Saving history text {}".format(snapshot_fname))
# in /diff/ and /preview/ we are going to assume for now that it's UTF-8 when reading # in /diff/ and /preview/ we are going to assume for now that it's UTF-8 when reading
# most sites are utf-8 and some are even broken utf-8 # most sites are utf-8 and some are even broken utf-8
with open(snapshot_fname, 'wb') as f: with open(os.path.join(self.watch_data_dir, snapshot_fname), 'wb') as f:
f.write(contents) f.write(contents)
f.close() f.close()
# Append to index # Append to index
# @todo check last char was \n # @todo check last char was \n
index_fname = os.path.join(output_path, "history.txt") index_fname = os.path.join(self.watch_data_dir, "history.txt")
with open(index_fname, 'a') as f: with open(index_fname, 'a') as f:
f.write("{},{}\n".format(timestamp, snapshot_fname)) f.write("{},{}\n".format(timestamp, snapshot_fname))
f.close() f.close()
@ -270,14 +287,14 @@ class model(dict):
return diff_types return diff_types
def get_screenshot(self): def get_screenshot(self):
fname = os.path.join(self.__datastore_path, self['uuid'], "last-screenshot.png") fname = os.path.join(self.watch_data_dir, "last-screenshot.png")
if os.path.isfile(fname): if os.path.isfile(fname):
return fname return fname
return False return False
def __get_file_ctime(self, filename): def __get_file_ctime(self, filename):
fname = os.path.join(self.__datastore_path, self['uuid'], filename) fname = os.path.join(self.watch_data_dir, filename)
if os.path.isfile(fname): if os.path.isfile(fname):
return int(os.path.getmtime(fname)) return int(os.path.getmtime(fname))
return False return False
@ -302,9 +319,14 @@ class model(dict):
def snapshot_error_screenshot_ctime(self): def snapshot_error_screenshot_ctime(self):
return self.__get_file_ctime('last-error-screenshot.png') return self.__get_file_ctime('last-error-screenshot.png')
@property
def watch_data_dir(self):
# The base dir of the watch data
return os.path.join(self.__datastore_path, self['uuid'])
def get_error_text(self): def get_error_text(self):
"""Return the text saved from a previous request that resulted in a non-200 error""" """Return the text saved from a previous request that resulted in a non-200 error"""
fname = os.path.join(self.__datastore_path, self['uuid'], "last-error.txt") fname = os.path.join(self.watch_data_dir, "last-error.txt")
if os.path.isfile(fname): if os.path.isfile(fname):
with open(fname, 'r') as f: with open(fname, 'r') as f:
return f.read() return f.read()
@ -312,7 +334,7 @@ class model(dict):
def get_error_snapshot(self): def get_error_snapshot(self):
"""Return path to the screenshot that resulted in a non-200 error""" """Return path to the screenshot that resulted in a non-200 error"""
fname = os.path.join(self.__datastore_path, self['uuid'], "last-error-screenshot.png") fname = os.path.join(self.watch_data_dir, "last-error-screenshot.png")
if os.path.isfile(fname): if os.path.isfile(fname):
return fname return fname
return False return False

@ -40,7 +40,8 @@
<fieldset> <fieldset>
<div class="pure-control-group"> <div class="pure-control-group">
{{ render_field(form.url, placeholder="https://...", required=true, class="m-d") }} {{ render_field(form.url, placeholder="https://...", required=true, class="m-d") }}
<span class="pure-form-message-inline">Some sites use JavaScript to create the content, for this you should <a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">use the Chrome/WebDriver Fetcher</a></span> <span class="pure-form-message-inline">Some sites use JavaScript to create the content, for this you should <a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">use the Chrome/WebDriver Fetcher</a></span><br/>
<span class="pure-form-message-inline">You can use variables in the URL, perfect for inserting the current date and other logic, <a href="https://github.com/dgtlmoon/changedetection.io/wiki/Handling-variables-in-the-watched-URL">help and examples here</a></span><br/>
</div> </div>
<div class="pure-control-group"> <div class="pure-control-group">
{{ render_field(form.title, class="m-d") }} {{ render_field(form.title, class="m-d") }}
@ -119,6 +120,12 @@
<div class="pure-control-group" id="request-method"> <div class="pure-control-group" id="request-method">
{{ render_field(form.method) }} {{ render_field(form.method) }}
</div> </div>
<div class="pure-control-group" id="external-header-server">
{{ render_field(form.external_header_server, placeholder="http://example.com/watch1") }}
<div class="pure-form-message-inline">
The watch will perform a GET request before each check to this URL and will use the headers in addition to the ones listed below and in global settings. <a href="https://github.com/dgtlmoon/changedetection.io/wiki/Run-JavaScript-before-change-detection">More help and examples here</a>
</div>
</div>
<div class="pure-control-group" id="request-headers"> <div class="pure-control-group" id="request-headers">
{{ render_field(form.headers, rows=5, placeholder="Example {{ render_field(form.headers, rows=5, placeholder="Example
Cookie: foobar Cookie: foobar

@ -23,7 +23,7 @@ def test_basic_auth(client, live_server):
# Check form validation # Check form validation
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": "", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": "", "url": test_url, "tag": "", "headers": "", "external_header_server": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data

@ -1,18 +1,31 @@
#!/usr/bin/python3 #!/usr/bin/python3
import time from .util import set_original_response, set_modified_response, live_server_setup
from flask import url_for from flask import url_for
from urllib.request import urlopen from urllib.request import urlopen
from . util import set_original_response, set_modified_response, live_server_setup from zipfile import ZipFile
import re
import time
def test_backup(client, live_server): def test_backup(client, live_server):
live_server_setup(live_server) live_server_setup(live_server)
set_original_response()
# Give the endpoint time to spin up # Give the endpoint time to spin up
time.sleep(1) time.sleep(1)
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(3)
res = client.get( res = client.get(
url_for("get_backup"), url_for("get_backup"),
follow_redirects=True follow_redirects=True
@ -20,6 +33,19 @@ def test_backup(client, live_server):
# Should get the right zip content type # Should get the right zip content type
assert res.content_type == "application/zip" assert res.content_type == "application/zip"
# Should be PK/ZIP stream # Should be PK/ZIP stream
assert res.data.count(b'PK') >= 2 assert res.data.count(b'PK') >= 2
# ZipFile from buffer seems non-obvious, just save it instead
with open("download.zip", 'wb') as f:
f.write(res.data)
zip = ZipFile('download.zip')
l = zip.namelist()
uuid4hex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
newlist = list(filter(uuid4hex.match, l)) # Read Note below
# Should be two txt files in the archive (history and the snapshot)
assert len(newlist) == 2

@ -98,7 +98,7 @@ def test_check_markup_css_filter_restriction(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": css_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": css_filter, "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data

@ -114,7 +114,7 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"ignore_status_codes": "y", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"ignore_status_codes": "y", "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data

@ -0,0 +1,33 @@
#!/usr/bin/python3
import time
from flask import url_for
from .util import live_server_setup
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_jinja2_in_url_query(client, live_server):
live_server_setup(live_server)
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_return_query', _external=True)
# because url_for() will URL-encode the var, but we dont here
full_url = "{}?{}".format(test_url,
"date={% now 'Europe/Berlin', '%Y' %}.{% now 'Europe/Berlin', '%m' %}.{% now 'Europe/Berlin', '%d' %}", )
res = client.post(
url_for("form_quick_watch_add"),
data={"url": full_url, "tag": "test"},
follow_redirects=True
)
assert b"Watch added" in res.data
time.sleep(3)
# It should report nothing found (no new 'unviewed' class)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'date=2' in res.data

@ -29,7 +29,7 @@ def test_share_watch(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": css_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": css_filter, "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data

@ -89,7 +89,7 @@ def test_check_xpath_filter_utf8(client, live_server):
time.sleep(1) time.sleep(1)
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": filter, "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -143,7 +143,7 @@ def test_check_xpath_text_function_utf8(client, live_server):
time.sleep(1) time.sleep(1)
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": filter, "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -192,7 +192,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": xpath_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": xpath_filter, "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -233,7 +233,7 @@ def test_xpath_validation(client, live_server):
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": "/something horrible", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": "/something horrible", "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"is not a valid XPath expression" in res.data assert b"is not a valid XPath expression" in res.data
@ -263,7 +263,7 @@ def test_check_with_prefix_css_filter(client, live_server):
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": "xpath://*[contains(@class, 'sametext')]", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"css_filter": "xpath://*[contains(@class, 'sametext')]", "url": test_url, "tag": "", "headers": "", "external_header_server": "",'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )

@ -196,5 +196,11 @@ def live_server_setup(live_server):
return self.app(environ, start_response) return self.app(environ, start_response)
live_server.app.wsgi_app = DefaultCheckboxMiddleware(live_server.app.wsgi_app) live_server.app.wsgi_app = DefaultCheckboxMiddleware(live_server.app.wsgi_app)
# Just return some GET var
@live_server.app.route('/test-return-query', methods=['GET'])
def test_return_query():
return request.query_string
live_server.start() live_server.start()

@ -46,5 +46,9 @@ selenium ~= 4.1.0
# need to revisit flask login versions # need to revisit flask login versions
werkzeug ~= 2.0.0 werkzeug ~= 2.0.0
# Templating, so far just in the URLs but in the future can be for the notifications also
jinja2
jinja2-time
# playwright is installed at Dockerfile build time because it's not available on all platforms # playwright is installed at Dockerfile build time because it's not available on all platforms

Loading…
Cancel
Save