Merge branch 'master' of https://github.com/dgtlmoon/changedetection.io into ui-improvements

pull/317/head
ntmmfts 3 years ago
commit 3b02b89a63

@ -16,6 +16,8 @@ Open source web page monitoring, notification and change detection.
[![Deploy](https://www.herokucdn.com/deploy/button.svg)](https://dashboard.heroku.com/new?template=https%3A%2F%2Fgithub.com%2Fdgtlmoon%2Fchangedetection.io%2Ftree%2Fmaster)
Read the [Heroku notes and limitations wiki page first](https://github.com/dgtlmoon/changedetection.io/wiki/Heroku-notes)
#### Example use cases
- Products and services have a change in pricing
@ -27,6 +29,7 @@ Open source web page monitoring, notification and change detection.
- University/organisation news from their website
- Detect and monitor changes in JSON API responses
- API monitoring and alerting
- Changes in legal and other documents
- Trigger API calls via notifications when text appears on a website
- Glue together APIs using the JSON filter and JSON notifications
- Create RSS feeds based on changes in web content
@ -88,6 +91,8 @@ docker run -d --restart always -p "127.0.0.1:5000:5000" -v datastore-volume:/dat
```bash
docker-compose pull && docker-compose up -d
```
### Filters
XPath, JSONPath and CSS support comes baked in! You can be as specific as you need, use XPath exported from various XPath element query creation tools.
### Notifications
@ -141,9 +146,9 @@ When you enable a `json:` filter, you can even automatically extract and parse e
See the wiki https://github.com/dgtlmoon/changedetection.io/wiki/Proxy-configuration
### RaspberriPi support?
### Raspberry Pi support?
RaspberriPi and linux/arm/v6 linux/arm/v7 arm64 devices are supported!
Raspberry Pi and linux/arm/v6 linux/arm/v7 arm64 devices are supported!
### Windows native support?

@ -30,7 +30,7 @@ import datetime
import pytz
from copy import deepcopy
__version__ = '0.39.5'
__version__ = '0.39.6'
datastore = None
@ -400,7 +400,7 @@ def changedetection_app(config=None, datastore_o=None):
# Get the most recent one
newest_history_key = datastore.get_val(uuid, 'newest_history_key')
# 0 means that theres only one, so that there should be no 'unviewed' history availabe
# 0 means that theres only one, so that there should be no 'unviewed' history available
if newest_history_key == 0:
newest_history_key = list(datastore.data['watching'][uuid]['history'].keys())[0]
@ -413,7 +413,11 @@ def changedetection_app(config=None, datastore_o=None):
stripped_content = handler.strip_ignore_text(raw_content,
datastore.data['watching'][uuid]['ignore_text'])
checksum = hashlib.md5(stripped_content).hexdigest()
if datastore.data['settings']['application'].get('ignore_whitespace', False):
checksum = hashlib.md5(stripped_content.translate(None, b'\r\n\t ')).hexdigest()
else:
checksum = hashlib.md5(stripped_content).hexdigest()
return checksum
return datastore.data['watching'][uuid]['previous_md5']
@ -545,6 +549,8 @@ def changedetection_app(config=None, datastore_o=None):
if request.method == 'GET':
form.minutes_between_check.data = int(datastore.data['settings']['requests']['minutes_between_check'])
form.notification_urls.data = datastore.data['settings']['application']['notification_urls']
form.global_ignore_text.data = datastore.data['settings']['application']['global_ignore_text']
form.ignore_whitespace.data = datastore.data['settings']['application']['ignore_whitespace']
form.extract_title_as_title.data = datastore.data['settings']['application']['extract_title_as_title']
form.fetch_backend.data = datastore.data['settings']['application']['fetch_backend']
form.notification_title.data = datastore.data['settings']['application']['notification_title']
@ -571,6 +577,8 @@ def changedetection_app(config=None, datastore_o=None):
datastore.data['settings']['application']['notification_format'] = form.notification_format.data
datastore.data['settings']['application']['notification_urls'] = form.notification_urls.data
datastore.data['settings']['application']['base_url'] = form.base_url.data
datastore.data['settings']['application']['global_ignore_text'] = form.global_ignore_text.data
datastore.data['settings']['application']['ignore_whitespace'] = form.ignore_whitespace.data
if form.trigger_check.data:
if len(form.notification_urls.data):
@ -861,7 +869,8 @@ def changedetection_app(config=None, datastore_o=None):
from pathlib import Path
# Remove any existing backup file, for now we just keep one file
for previous_backup_filename in Path(app.config['datastore_path']).rglob('changedetection-backup-*.zip'):
for previous_backup_filename in Path(datastore_o.datastore_path).rglob('changedetection-backup-*.zip'):
os.unlink(previous_backup_filename)
# create a ZipFile object
@ -869,7 +878,7 @@ def changedetection_app(config=None, datastore_o=None):
# We only care about UUIDS from the current index file
uuids = list(datastore.data['watching'].keys())
backup_filepath = os.path.join(app.config['datastore_path'], backupname)
backup_filepath = os.path.join(datastore_o.datastore_path, backupname)
with zipfile.ZipFile(backup_filepath, "w",
compression=zipfile.ZIP_DEFLATED,
@ -879,22 +888,22 @@ def changedetection_app(config=None, datastore_o=None):
datastore.sync_to_json()
# Add the index
zipObj.write(os.path.join(app.config['datastore_path'], "url-watches.json"), arcname="url-watches.json")
zipObj.write(os.path.join(datastore_o.datastore_path, "url-watches.json"), arcname="url-watches.json")
# Add the flask app secret
zipObj.write(os.path.join(app.config['datastore_path'], "secret.txt"), arcname="secret.txt")
zipObj.write(os.path.join(datastore_o.datastore_path, "secret.txt"), arcname="secret.txt")
# Add any snapshot data we find, use the full path to access the file, but make the file 'relative' in the Zip.
for txt_file_path in Path(app.config['datastore_path']).rglob('*.txt'):
for txt_file_path in Path(datastore_o.datastore_path).rglob('*.txt'):
parent_p = txt_file_path.parent
if parent_p.name in uuids:
zipObj.write(txt_file_path,
arcname=str(txt_file_path).replace(app.config['datastore_path'], ''),
arcname=str(txt_file_path).replace(datastore_o.datastore_path, ''),
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8)
# Create a list file with just the URLs, so it's easier to port somewhere else in the future
list_file = os.path.join(app.config['datastore_path'], "url-list.txt")
list_file = os.path.join(datastore_o.datastore_path, "url-list.txt")
with open(list_file, "w") as f:
for uuid in datastore.data['watching']:
url = datastore.data['watching'][uuid]['url']
@ -906,7 +915,8 @@ def changedetection_app(config=None, datastore_o=None):
compress_type=zipfile.ZIP_DEFLATED,
compresslevel=8)
return send_from_directory(app.config['datastore_path'], backupname, as_attachment=True)
# Send_from_directory needs to be the full absolute path
return send_from_directory(os.path.abspath(datastore_o.datastore_path), backupname, as_attachment=True)
@app.route("/static/<string:group>/<string:filename>", methods=['GET'])
def static_content(group, filename):

@ -9,12 +9,19 @@ import urllib3.exceptions
class EmptyReply(Exception):
def __init__(self, status_code, url):
# Set this so we can use it in other parts of the app
self.status_code = status_code
self.url = url
return
pass
class Fetcher():
error = None
status_code = None
content = None # Should be bytes?
content = None # Should always be bytes.
headers = None
fetcher_description ="No description"
@ -68,9 +75,12 @@ class html_webdriver(Fetcher):
# Configs for Proxy setup
# In the ENV vars, is prefixed with "webdriver_", so it is for example "webdriver_sslProxy"
selenium_proxy_settings_mappings = ['ftpProxy', 'httpProxy', 'noProxy',
selenium_proxy_settings_mappings = ['proxyType', 'ftpProxy', 'httpProxy', 'noProxy',
'proxyAutoconfigUrl', 'sslProxy', 'autodetect',
'socksProxy', 'socksUsername', 'socksPassword']
'socksProxy', 'socksVersion', 'socksUsername', 'socksPassword']
proxy=None
def __init__(self):
@ -106,10 +116,13 @@ class html_webdriver(Fetcher):
# @todo - how to check this? is it possible?
self.status_code = 200
# @todo somehow we should try to get this working for WebDriver
# raise EmptyReply(url=url, status_code=r.status_code)
# @todo - dom wait loaded?
time.sleep(5)
self.content = driver.page_source
self.headers = {}
driver.quit()
@ -126,7 +139,6 @@ class html_webdriver(Fetcher):
# driver.quit() seems to cause better exceptions
driver.quit()
return True
# "html_requests" is listed as the default fetcher in store.py!
@ -143,13 +155,16 @@ class html_requests(Fetcher):
timeout=timeout,
verify=False)
# https://stackoverflow.com/questions/44203397/python-requests-get-returns-improperly-decoded-text-instead-of-utf-8
# Return bytes here
html = r.text
# @todo test this
# @todo maybe you really want to test zero-byte return pages?
if not r or not html or not len(html):
raise EmptyReply(url)
raise EmptyReply(url=url, status_code=r.status_code)
self.status_code = r.status_code
self.content = html
self.headers = r.headers

@ -58,10 +58,7 @@ class perform_site_check():
watch = self.datastore.data['watching'][uuid]
update_obj = {'previous_md5': self.datastore.data['watching'][uuid]['previous_md5'],
'history': {},
"last_checked": timestamp
}
update_obj = {}
extra_headers = self.datastore.get_val(uuid, 'headers')
@ -104,41 +101,61 @@ class perform_site_check():
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?
is_html = True
is_json = fetcher.headers.get('Content-Type', '') == 'application/json'
is_html = not is_json
css_filter_rule = watch['css_filter']
if css_filter_rule and len(css_filter_rule.strip()):
has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
if is_json and not has_filter_rule:
css_filter_rule = "json:$"
has_filter_rule = True
if has_filter_rule:
if 'json:' in css_filter_rule:
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False
else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
stripped_text_from_html = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
if is_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = fetcher.content
if css_filter_rule and len(css_filter_rule.strip()):
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
if has_filter_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
if css_filter_rule[0] == '/':
html_content = html_tools.xpath_filter(xpath_filter=css_filter_rule, html_content=fetcher.content)
else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
# get_text() via inscriptis
stripped_text_from_html = get_text(html_content)
# Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
update_obj["last_check_status"] = fetcher.get_last_status_code()
update_obj["last_error"] = False
# If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner
if len(watch['ignore_text']):
stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, watch['ignore_text'])
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
if len(text_to_ignore):
stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, text_to_ignore)
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
else:
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
# On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
if not len(watch['previous_md5']):
watch['previous_md5'] = fetched_md5
update_obj["previous_md5"] = fetched_md5
blocked_by_not_found_trigger_text = False
@ -160,16 +177,12 @@ class perform_site_check():
break
# could be None or False depending on JSON type
# On the first run of a site, watch['previous_md5'] will be an empty string
if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
changed_detected = True
# Don't confuse people by updating as last-changed, when it actually just changed from None..
if self.datastore.get_val(uuid, 'previous_md5'):
update_obj["last_changed"] = timestamp
update_obj["previous_md5"] = fetched_md5
update_obj["last_changed"] = timestamp
# Extract title as title
if is_html:
@ -178,4 +191,4 @@ class perform_site_check():
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
return changed_detected, update_obj, stripped_text_from_html
return changed_detected, update_obj, text_content_before_ignored_filter

@ -181,7 +181,7 @@ class ValidateListRegex(object):
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
raise ValidationError(message % (line))
class ValidateCSSJSONInput(object):
class ValidateCSSJSONXPATHInput(object):
"""
Filter validation
@todo CSS validator ;)
@ -191,6 +191,24 @@ class ValidateCSSJSONInput(object):
self.message = message
def __call__(self, form, field):
# Nothing to see here
if not len(field.data.strip()):
return
# Does it look like XPath?
if field.data.strip()[0] == '/':
from lxml import html, etree
tree = html.fromstring("<html></html>")
try:
tree.xpath(field.data.strip())
except etree.XPathEvalError as e:
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
raise ValidationError(message % (field.data, str(e)))
except:
raise ValidationError("A system-error occurred when validating your XPath expression")
if 'json:' in field.data:
from jsonpath_ng.exceptions import JsonPathParserError, JsonPathLexerError
from jsonpath_ng.ext import parse
@ -202,6 +220,8 @@ class ValidateCSSJSONInput(object):
except (JsonPathParserError, JsonPathLexerError) as e:
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
raise ValidationError(message % (input, str(e)))
except:
raise ValidationError("A system-error occurred when validating your JSONPath expression")
# Re #265 - maybe in the future fetch the page and offer a
# warning/notice that its possible the rule doesnt yet match anything?
@ -229,7 +249,7 @@ class watchForm(commonSettingsForm):
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
[validators.Optional(), validators.NumberRange(min=1)])
css_filter = StringField('CSS/JSON Filter', [ValidateCSSJSONInput()])
css_filter = StringField('CSS/JSON/XPATH Filter', [ValidateCSSJSONXPATHInput()])
title = StringField('Title')
ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
@ -258,3 +278,5 @@ class globalSettingsForm(commonSettingsForm):
[validators.NumberRange(min=1)])
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title')
base_url = StringField('Base URL', validators=[validators.Optional()])
global_ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
ignore_whitespace = BooleanField('Ignore whitespace')

@ -17,6 +17,20 @@ def css_filter(css_filter, html_content):
return html_block + "\n"
# Return str Utf-8 of matched rules
def xpath_filter(xpath_filter, html_content):
from lxml import html
from lxml import etree
tree = html.fromstring(html_content)
html_block = ""
for item in tree.xpath(xpath_filter.strip()):
html_block+= etree.tostring(item, pretty_print=True).decode('utf-8')+"<br/>"
return html_block
# Extract/find element
def extract_element(find='title', html_content=''):

@ -45,6 +45,8 @@ class ChangeDetectionStore:
'base_url' : None,
'extract_title_as_title': False,
'fetch_backend': 'html_requests',
'global_ignore_text': [], # List of text to ignore when calculating the comparison checksum
'ignore_whitespace': False,
'notification_urls': [], # Apprise URL list
# Custom notification content
'notification_title': None,
@ -369,6 +371,10 @@ class ChangeDetectionStore:
import uuid
output_path = "{}/{}".format(self.datastore_path, watch_uuid)
# Incase the operator deleted it, check and create.
if not os.path.isdir(output_path):
mkdir(output_path)
fname = "{}/{}.stripped.txt".format(output_path, uuid.uuid4())
with open(fname, 'wb') as f:
f.write(contents)

@ -95,8 +95,10 @@ User-Agent: wonderbra 1.0") }}
<li>CSS - Limit text to this CSS rule, only text matching this CSS rule is included.</li>
<li>JSON - Limit text to this JSON rule, using <a href="https://pypi.org/project/jsonpath-ng/">JSONPath</a>, prefix with <b>"json:"</b>, <a
href="https://jsonpath.com/" target="new">test your JSONPath here</a></li>
<li>XPATH - Limit text to this XPath rule, simply start with a forward-slash, example <b>//*[contains(@class, 'sametext')]</b>, <a
href="http://xpather.com/" target="new">test your XPath here</a></li>
</ul>
Please be sure that you thoroughly understand how to write CSS or JSONPath selector rules before filing an issue on GitHub! <a
Please be sure that you thoroughly understand how to write CSS or JSONPath, XPath selector rules before filing an issue on GitHub! <a
href="https://github.com/dgtlmoon/changedetection.io/wiki/CSS-Selector-help">here for more CSS selector help</a>.<br/>
</span>
</div>
@ -107,8 +109,11 @@ User-Agent: wonderbra 1.0") }}
/some.regex\d{2}/ for case-INsensitive regex
") }}
<span class="pure-form-message-inline">
Each line processed separately, any line matching will be ignored.<br/>
Regular Expression support, wrap the line in forward slash <b>/regex/</b>.
<ul>
<li>Each line processed separately, any line matching will be ignored (removed before creating the checksum)</li>
<li>Regular Expression support, wrap the line in forward slash <b>/regex/</b></li>
<li>Changing this will affect the comparison checksum which may trigger an alert</li>
</ul>
</span>
</fieldset>

@ -13,6 +13,7 @@
<li class="tab" id="default-tab"><a href="#general">General</a></li>
<li class="tab"><a href="#notifications">Notifications</a></li>
<li class="tab"><a href="#fetching">Fetching</a></li>
<li class="tab"><a href="#filters">Global Filters</a></li>
</ul>
</div>
<div class="box-wrap inner">
@ -65,6 +66,34 @@
</span>
</div>
</div>
<div class="tab-pane-inner" id="filters">
<fieldset class="pure-group">
{{ render_field(form.ignore_whitespace) }}
<span class="pure-form-message-inline">Ignore whitespace, tabs and new-lines/line-feeds when considering if a change was detected.<br/>
<i>Note:</i> Changing this will change the status of your existing watches, possibily trigger alerts etc.
</span>
</fieldset>
<fieldset class="pure-group">
{{ render_field(form.global_ignore_text, rows=5, placeholder="Some text to ignore in a line
/some.regex\d{2}/ for case-INsensitive regex
") }}
<span class="pure-form-message-inline">Note: This is applied globally in addition to the per-watch rules.</span><br/>
<span class="pure-form-message-inline">
<ul>
<li>Note: This is applied globally in addition to the per-watch rules.</li>
<li>Each line processed separately, any line matching will be ignored (removed before creating the checksum)</li>
<li>Regular Expression support, wrap the line in forward slash <b>/regex/</b></li>
<li>Changing this will affect the comparison checksum which may trigger an alert</li>
</ul>
</span>
</fieldset>
</div>
<div id="actions">
<div class="pure-control-group">
<button type="submit" class="pure-button pure-button-primary">Save</button>

@ -18,7 +18,8 @@ def cleanup(datastore_path):
'url-watches.json',
'notification.txt',
'count.txt',
'endpoint-content.txt']
'endpoint-content.txt'
]
for file in files:
try:
os.unlink("{}/{}".format(datastore_path, file))

@ -0,0 +1,25 @@
#!/usr/bin/python3
import time
from flask import url_for
from urllib.request import urlopen
from . util import set_original_response, set_modified_response, live_server_setup
def test_backup(client, live_server):
live_server_setup(live_server)
# Give the endpoint time to spin up
time.sleep(1)
res = client.get(
url_for("get_backup"),
follow_redirects=True
)
# Should get the right zip content type
assert res.content_type == "application/zip"
# Should be PK/ZIP stream
assert res.data.count(b'PK') >= 2

@ -0,0 +1,38 @@
#!/usr/bin/python3
import time
from flask import url_for
from . util import live_server_setup
from ..html_tools import *
def test_setup(live_server):
live_server_setup(live_server)
def test_error_handler(client, live_server):
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint_403_error', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'Status Code 403' in res.data
assert bytes("just now".encode('utf-8')) in res.data

@ -151,3 +151,88 @@ def test_check_ignore_text_functionality(client, live_server):
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_check_global_ignore_text_functionality(client, live_server):
sleep_time_for_fetch_thread = 3
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ"
set_original_ignore_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Goto the settings page, add our ignore text
res = client.post(
url_for("settings_page"),
data={
"minutes_between_check": 180,
"global_ignore_text": ignore_text,
'fetch_backend': "html_requests"
},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Goto the edit page of the item, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"ignore_text": "something irrelevent but just to check", "url": test_url, 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Check it saved
res = client.get(
url_for("settings_page"),
)
assert bytes(ignore_text.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-endpoint' in res.data
# Make a change
set_modified_ignore_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-endpoint' in res.data
# Just to be sure.. set a regular modified change..
set_modified_original_ignore_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

@ -0,0 +1,96 @@
#!/usr/bin/python3
import time
from flask import url_for
from . util import live_server_setup
def test_setup(live_server):
live_server_setup(live_server)
# Should be the same as set_original_ignore_response() but with a little more whitespacing
def set_original_ignore_response_but_with_whitespace():
test_return_data = """<html>
<body>
Some initial text</br>
<p>
Which is across multiple lines</p>
<br>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def set_original_ignore_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_check_ignore_whitespace(client, live_server):
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
set_original_ignore_response()
# Goto the settings page, add our ignore text
res = client.post(
url_for("settings_page"),
data={
"minutes_between_check": 180,
"ignore_whitespace": "y",
'fetch_backend': "html_requests"
},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
set_original_ignore_response_but_with_whitespace()
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-endpoint' in res.data

@ -111,6 +111,21 @@ def set_original_response():
f.write(test_return_data)
return None
def set_response_with_html():
test_return_data = """
{
"test": [
{
"html": "<b>"
}
]
}
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_modified_response():
test_return_data = """
{
@ -138,6 +153,37 @@ def set_modified_response():
return None
def test_check_json_without_filter(client, live_server):
# Request a JSON document from a application/json source containing HTML
# and be sure it doesn't get chewed up by instriptis
set_response_with_html()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint_json', _external=True)
client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'&#34;&lt;b&gt;' in res.data
assert res.data.count(b'{\n') >= 2
def test_check_json_filter(client, live_server):
json_filter = 'json:boss.name'

@ -159,6 +159,9 @@ def test_check_notification(client, live_server):
with open("test-datastore/notification.txt", "r") as f:
notification_submission = f.read()
print ("Notification submission was:", notification_submission)
# Re #342 - check for accidental python byte encoding of non-utf8/string
assert "b'" not in notification_submission
assert re.search('Watch UUID: [0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', notification_submission, re.IGNORECASE)
assert "Watch title: my title" in notification_submission

@ -0,0 +1,118 @@
#!/usr/bin/python3
import time
from flask import url_for
from . util import live_server_setup
from ..html_tools import *
def test_setup(live_server):
live_server_setup(live_server)
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
<div class="sametext">Some text thats the same</div>
<div class="changetext">Some text that will change</div>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_modified_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. THIS CHANGES AND SHOULDNT TRIGGER A CHANGE</br>
<div class="sametext">Some text thats the same</div>
<div class="changetext">Some new text</div>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def test_check_markup_xpath_filter_restriction(client, live_server):
sleep_time_for_fetch_thread = 3
xpath_filter = "//*[contains(@class, 'sametext')]"
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"css_filter": xpath_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# view it/reset state back to viewed
client.get(url_for("diff_history_page", uuid="first"), follow_redirects=True)
# Make a change
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
def test_xpath_validation(client, live_server):
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
res = client.post(
url_for("edit_page", uuid="first"),
data={"css_filter": "/something horrible", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data

@ -44,6 +44,23 @@ def live_server_setup(live_server):
with open("test-datastore/endpoint-content.txt", "r") as f:
return f.read()
@live_server.app.route('/test-endpoint-json')
def test_endpoint_json():
from flask import make_response
with open("test-datastore/endpoint-content.txt", "r") as f:
resp = make_response(f.read())
resp.headers['Content-Type'] = 'application/json'
return resp
@live_server.app.route('/test-403')
def test_endpoint_403_error():
from flask import make_response
resp = make_response('', 403)
return resp
# Just return the headers in the request
@live_server.app.route('/test-headers')
def test_headers():

@ -2,7 +2,12 @@ import threading
import queue
import time
# Requests for checking on the site use a pool of thread Workers managed by a Queue.
# A single update worker
#
# Requests for checking on a single site(watch) from a queue of watches
# (another process inserts watches into the queue that are time-ready for checking)
class update_worker(threading.Thread):
current_uuid = None
@ -34,92 +39,108 @@ class update_worker(threading.Thread):
changed_detected = False
contents = ""
update_obj= {}
now = time.time()
try:
now = time.time()
changed_detected, update_obj, contents = update_handler.run(uuid)
# Always record that we atleast tried
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3)})
# Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc
if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e:
self.app.logger.error("File permission error updating", uuid, str(e))
except content_fetcher.EmptyReply as e:
self.datastore.update_watch(uuid=uuid, update_obj={'last_error':str(e)})
# Some kind of custom to-str handler in the exception handler that does this?
err_text = "EmptyReply: Status Code {}".format(e.status_code)
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except Exception as e:
self.app.logger.error("Exception reached processing watch UUID:%s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
else:
if update_obj:
try:
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
if changed_detected:
n_object = {}
# A change was detected
fname = self.datastore.save_history_text(watch_uuid=uuid, contents=contents)
# Update history with the stripped text for future reference, this will also mean we save the first
# Should always be keyed by string(timestamp)
self.datastore.update_watch(uuid, {"history": {str(update_obj["last_checked"]): fname}})
watch = self.datastore.data['watching'][uuid]
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
if len(watch['history']) > 1:
dates = list(watch['history'].keys())
# Convert to int, sort and back to str again
# @todo replace datastore getter that does this automatically
dates = [int(i) for i in dates]
dates.sort(reverse=True)
dates = [str(i) for i in dates]
prev_fname = watch['history'][dates[1]]
# Did it have any notification alerts to hit?
if len(watch['notification_urls']):
print(">>> Notifications queued for UUID from watch {}".format(uuid))
n_object['notification_urls'] = watch['notification_urls']
n_object['notification_title'] = watch['notification_title']
n_object['notification_body'] = watch['notification_body']
n_object['notification_format'] = watch['notification_format']
# No? maybe theres a global setting, queue them all
elif len(self.datastore.data['settings']['application']['notification_urls']):
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid))
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
try:
watch = self.datastore.data['watching'][uuid]
fname = "" # Saved history text filename
# For the FIRST time we check a site, or a change detected, save the snapshot.
if changed_detected or not watch['last_checked']:
# A change was detected
fname = self.datastore.save_history_text(watch_uuid=uuid, contents=contents)
# Should always be keyed by string(timestamp)
self.datastore.update_watch(uuid, {"history": {str(round(time.time())): fname}})
# Generally update anything interesting returned
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
# A change was detected
if changed_detected:
n_object = {}
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
if len(watch['history']) > 1:
dates = list(watch['history'].keys())
# Convert to int, sort and back to str again
# @todo replace datastore getter that does this automatically
dates = [int(i) for i in dates]
dates.sort(reverse=True)
dates = [str(i) for i in dates]
prev_fname = watch['history'][dates[1]]
# Did it have any notification alerts to hit?
if len(watch['notification_urls']):
print(">>> Notifications queued for UUID from watch {}".format(uuid))
n_object['notification_urls'] = watch['notification_urls']
n_object['notification_title'] = watch['notification_title']
n_object['notification_body'] = watch['notification_body']
n_object['notification_format'] = watch['notification_format']
# No? maybe theres a global setting, queue them all
elif len(self.datastore.data['settings']['application']['notification_urls']):
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid))
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
else:
print(">>> NO notifications queued, watch and global notification URLs were empty.")
# Only prepare to notify if the rules above matched
if 'notification_urls' in n_object:
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object['notification_format'] == 'HTML':
line_feed_sep = "</br>"
else:
print(">>> NO notifications queued, watch and global notification URLs were empty.")
# Only prepare to notify if the rules above matched
if 'notification_urls' in n_object:
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object['notification_format'] == 'HTML':
line_feed_sep = "</br>"
else:
line_feed_sep = "\n"
from changedetectionio import diff
n_object.update({
'watch_url': watch['url'],
'uuid': uuid,
'current_snapshot': str(contents),
'diff_full': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
'diff': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
})
self.notification_q.put(n_object)
except Exception as e:
print("!!!! Exception in update_worker !!!\n", e)
line_feed_sep = "\n"
from changedetectionio import diff
n_object.update({
'watch_url': watch['url'],
'uuid': uuid,
'current_snapshot': contents.decode('utf-8'),
'diff_full': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
'diff': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
})
self.notification_q.put(n_object)
except Exception as e:
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e)
finally:
# Always record that we atleast tried
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
'last_checked': round(time.time())})
self.current_uuid = None # Done
self.q.task_done()

@ -17,9 +17,9 @@ services:
# Alternative WebDriver/selenium URL, do not use "'s or 's!
# - WEBDRIVER_URL=http://browser-chrome:4444/wd/hub
#
# WebDriver proxy settings webdriver_ftpProxy, webdriver_httpProxy, webdriver_noProxy,
# WebDriver proxy settings webdriver_proxyType, webdriver_ftpProxy, webdriver_httpProxy, webdriver_noProxy,
# webdriver_proxyAutoconfigUrl, webdriver_sslProxy, webdriver_autodetect,
# webdriver_socksProxy, webdriver_socksUsername, webdriver_socksPassword
# webdriver_socksProxy, webdriver_socksUsername, webdriver_socksVersion, webdriver_socksPassword
#
# https://selenium-python.readthedocs.io/api.html#module-selenium.webdriver.common.proxy
#
@ -43,7 +43,8 @@ services:
restart: unless-stopped
# Used for fetching pages via WebDriver+Chrome where you need Javascript support.
# Does not work on rPi, https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver
# Now working on arm64 (needs testing on rPi - tested on Oracle ARM instance)
# replace image with seleniarm/standalone-chromium:4.0.0-20211213
# browser-chrome:
# hostname: browser-chrome

@ -26,7 +26,11 @@ paho-mqtt
# ERROR: Could not build wheels for cryptography which use PEP 517 and cannot be installed directly
cryptography ~= 3.4
# Used for CSS filtering, replace with soupsieve and lxml for xpath
# Used for CSS filtering
bs4
selenium ~= 3.141
# XPath filtering, lxml is required by bs4 anyway, but put it here to be safe.
lxml
# 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0
selenium ~= 4.1.0

Loading…
Cancel
Save