Apprise notifications (#43)

* issue #4 Adding settings screen for apprise URLS
* Adding test notification mechanism

* Move Worker module to own class file

* Adding basic notification URL runner

* Tests for notifications

* Tweak readme with notification info

* Move notification test to main test_backend.py

* Fix spacing

* Adding notifications screenshot

* Cleanup more files from test

* Offer send notification test on individual edits and main/default

* Process global notifications

* All branches test

* Wrap worker notification process in try/catch, use global if nothing set

* Fix syntax

* Handle exception, increase wait time for liveserver to come up

* Fixing test setup

* remove debug

* Split tests into their own totally isolated setups, if you know a better way to make live_server() work, MR :)

* Tidying up lint/imports
pull/59/head
dgtlmoon 3 years ago committed by GitHub
parent b752690f89
commit f877af75b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,10 +1,7 @@
name: Test only
on:
push:
branches:
- /refs/heads/*
- !master
# Triggers the workflow on push or pull request events
on: [push, pull_request]
jobs:
build:
@ -31,5 +28,6 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Test with pytest
run: |
cd backend; pytest
# Each test is totally isolated and performs its own cleanup/reset
cd backend; ./run_all_tests.sh

@ -55,6 +55,29 @@ Examining differences in content.
Please :star: star :star: this project and help it grow! https://github.com/dgtlmoon/changedetection.io/
### Notifications
ChangeDetection.io supports a massive amount of notifications (including email, office365, custom APIs, etc) when a web-page has a change detected thanks to the <a href="https://github.com/caronc/apprise">apprise</a> library.
Simply set one or more notification URL's in the _[edit]_ tab of that watch.
Just some examples
discord://webhook_id/webhook_token
flock://app_token/g:channel_id
gitter://token/room
gchat://workspace/key/token
msteams://TokenA/TokenB/TokenC/
o365://TenantID:AccountEmail/ClientID/ClientSecret/TargetEmail
rocket://user:password@hostname/#Channel
mailto://user:pass@example.com?to=receivingAddress@example.com
json://someserver.com/custom-api
syslog://
<a href="https://github.com/caronc/apprise">And everything else in this list!</a>
<img src="https://raw.githubusercontent.com/dgtlmoon/changedetection.io/master/screenshot-notifications.png" style="max-width:100%;" alt="Self-hosted web page change monitoring notifications" title="Self-hosted web page change monitoring notifications" />
### Notes
- Does not yet support Javascript

@ -41,7 +41,9 @@ extra_stylesheets = []
update_q = queue.Queue()
app = Flask(__name__, static_url_path="/var/www/change-detection/backen/static")
notification_q = queue.Queue()
app = Flask(__name__, static_url_path="/var/www/change-detection/backend/static")
# Stop browser caching of assets
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
@ -347,11 +349,22 @@ def changedetection_app(conig=None, datastore_o=None):
'headers': extra_headers
}
# Notification URLs
form_notification_text = request.form.get('notification_urls')
notification_urls = []
if form_notification_text:
for text in form_notification_text.strip().split("\n"):
text = text.strip()
if len(text):
notification_urls.append(text)
datastore.data['watching'][uuid]['notification_urls'] = notification_urls
# Ignore text
form_ignore_text = request.form.get('ignore-text').strip()
form_ignore_text = request.form.get('ignore-text')
ignore_text = []
if len(form_ignore_text):
for text in form_ignore_text.split("\n"):
if form_ignore_text:
for text in form_ignore_text.strip().split("\n"):
text = text.strip()
if len(text):
ignore_text.append(text)
@ -368,6 +381,14 @@ def changedetection_app(conig=None, datastore_o=None):
messages.append({'class': 'ok', 'message': 'Updated watch.'})
trigger_n = request.form.get('trigger-test-notification')
if trigger_n:
n_object = {'watch_url': url,
'notification_urls': datastore.data['settings']['application']['notification_urls']}
notification_q.put(n_object)
messages.append({'class': 'ok', 'message': 'Notifications queued.'})
return redirect(url_for('index'))
else:
@ -381,6 +402,30 @@ def changedetection_app(conig=None, datastore_o=None):
global messages
if request.method == 'GET':
if request.values.get('notification-test'):
url_count = len(datastore.data['settings']['application']['notification_urls'])
if url_count:
import apprise
apobj = apprise.Apprise()
apobj.debug = True
# Add each notification
for n in datastore.data['settings']['application']['notification_urls']:
apobj.add(n)
outcome = apobj.notify(
body='Hello from the worlds best and simplest web page change detection and monitoring service!',
title='Changedetection.io Notification Test',
)
if outcome:
messages.append(
{'class': 'notice', 'message': "{} Notification URLs reached.".format(url_count)})
else:
messages.append(
{'class': 'error', 'message': "One or more Notification URLs failed"})
return redirect(url_for('settings_page'))
if request.values.get('removepassword'):
from pathlib import Path
@ -417,16 +462,30 @@ def changedetection_app(conig=None, datastore_o=None):
if minutes >= 5:
datastore.data['settings']['requests']['minutes_between_check'] = minutes
datastore.needs_write = True
messages.append({'class': 'ok', 'message': "Updated"})
else:
messages.append(
{'class': 'error', 'message': "Must be atleast 5 minutes."})
# 'validators' package doesnt work because its often a non-stanadard protocol. :(
datastore.data['settings']['application']['notification_urls'] = []
trigger_n = request.form.get('trigger-test-notification')
for n in request.values.get('notification_urls').strip().split("\n"):
url = n.strip()
datastore.data['settings']['application']['notification_urls'].append(url)
datastore.needs_write = True
if trigger_n:
n_object = {'watch_url': "Test from changedetection.io!",
'notification_urls': datastore.data['settings']['application']['notification_urls']}
notification_q.put(n_object)
messages.append({'class': 'ok', 'message': 'Notifications queued.'})
output = render_template("settings.html", messages=messages,
minutes=datastore.data['settings']['requests']['minutes_between_check'])
minutes=datastore.data['settings']['requests']['minutes_between_check'],
notification_urls="\r\n".join(
datastore.data['settings']['application']['notification_urls']))
messages = []
return output
@ -687,6 +746,8 @@ def changedetection_app(conig=None, datastore_o=None):
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()
# Check for new release version
threading.Thread(target=check_for_new_version).start()
return app
@ -718,54 +779,42 @@ def check_for_new_version():
# Check daily
app.config.exit.wait(86400)
def notification_runner():
# Requests for checking on the site use a pool of thread Workers managed by a Queue.
class Worker(threading.Thread):
current_uuid = None
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
def run(self):
from backend import fetch_site_status
update_handler = fetch_site_status.perform_site_check(datastore=datastore)
while not app.config.exit.is_set():
try:
# At the moment only one thread runs (single runner)
n_object = notification_q.get(block=False)
except queue.Empty:
time.sleep(1)
pass
while not app.config.exit.is_set():
else:
import apprise
# Create an Apprise instance
try:
uuid = self.q.get(block=False)
except queue.Empty:
pass
else:
self.current_uuid = uuid
apobj = apprise.Apprise()
for url in n_object['notification_urls']:
apobj.add(url.strip())
if uuid in list(datastore.data['watching'].keys()):
try:
changed_detected, result, contents = update_handler.run(uuid)
apobj.notify(
body=n_object['watch_url'],
# @todo This should be configurable.
title="ChangeDetection.io Notification - {}".format(n_object['watch_url'])
)
except PermissionError as s:
app.logger.error("File permission error updating", uuid, str(s))
else:
if result:
datastore.update_watch(uuid=uuid, update_obj=result)
if changed_detected:
# A change was detected
datastore.save_history_text(uuid=uuid, contents=contents, result_obj=result)
self.current_uuid = None # Done
self.q.task_done()
app.config.exit.wait(1)
except Exception as e:
print("Watch URL: {} Error {}".format(n_object['watch_url'],e))
# Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks():
from backend import update_worker
# Spin up Workers.
for _ in range(datastore.data['settings']['requests']['workers']):
new_worker = Worker(update_q)
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()

@ -1,7 +1,7 @@
[pytest]
addopts = --no-start-live-server --live-server-port=5005
#testpaths = tests pytest_invenio
#live_server_scope = session
#live_server_scope = function
filterwarnings =
ignore::DeprecationWarning:urllib3.*:

@ -0,0 +1,16 @@
#!/bin/bash
# live_server will throw errors even with live_server_scope=function if I have the live_server setup in different functions
# and I like to restart the server for each test (and have the test cleanup after each test)
# merge request welcome :)
# exit when any command fails
set -e
find tests/test_*py -type f|while read test_name
do
echo "TEST RUNNING $test_name"
pytest $test_name
done

@ -39,7 +39,8 @@ class ChangeDetectionStore:
'workers': 10 # Number of threads, lower is better for slow connections
},
'application': {
'password': False
'password': False,
'notification_urls': [] # Apprise URL list
}
}
}
@ -58,7 +59,8 @@ class ChangeDetectionStore:
'uuid': str(uuid_builder.uuid4()),
'headers': {}, # Extra headers to send
'history': {}, # Dict of timestamp and output stripped filename
'ignore_text': [] # List of text to ignore when calculating the comparison checksum
'ignore_text': [], # List of text to ignore when calculating the comparison checksum
'notification_urls': [] # List of URLs to add to the notification Queue (Usually AppRise)
}
if path.isfile('/source.txt'):
@ -109,7 +111,7 @@ class ChangeDetectionStore:
self.add_watch(url='https://www.gov.uk/coronavirus', tag='Covid')
self.add_watch(url='https://changedetection.io', tag='Tech news')
self.__data['version_tag'] = "0.292"
self.__data['version_tag'] = "0.30"
if not 'app_guid' in self.__data:
import sys

@ -49,8 +49,24 @@ User-Agent: wonderbra 1.0"
<br/>
</fieldset>
<div class="pure-control-group">
<label for="tag">Notification URLs</label>
<textarea id="notification_urls" name="notification_urls" class="pure-input-1-2" placeholder=""
style="width: 100%;
font-family:monospace;
white-space: pre;
overflow-wrap: normal;
overflow-x: scroll;" rows="5">{% for value in watch.notification_urls %}{{ value }}
{% endfor %}</textarea>
<span class="pure-form-message-inline">Use <a target=_new href="https://github.com/caronc/apprise">AppRise URLs</a> for notification to just about any service!</a> </span>
<br/>
<div class="pure-controls">
<span class="pure-form-message-inline"><label for="trigger-test-notification" class="pure-checkbox">
<input type="checkbox" id="trigger-test-notification" name="trigger-test-notification"> Send test notification on save.</label></span>
</div>
</div>
<br/>
<div class="pure-control-group">
<button type="submit" class="pure-button pure-button-primary">Save</button>
</div>

@ -13,6 +13,7 @@
<span class="pure-form-message-inline">This is a required field.</span>
</div>
<br/>
<hr>
<div class="pure-control-group">
<label for="minutes">Password protection</label>
<input type="password" id="password" name="password" size="15"/>
@ -21,6 +22,27 @@
{% endif %}
</div>
<br/>
<hr>
<div class="pure-control-group">
<label for="minutes">Global notification settings</label><br/>
Notification URLs <a href="https://github.com/caronc/apprise"> see Apprise examples</a>.
<textarea style="overflow-wrap: normal; overflow-x: scroll;" id="notification_urls" name="notification_urls" cols="80"
rows="6" wrap=off placeholder="Example:
Gitter - gitter://token/room
Office365 - o365://TenantID:AccountEmail/ClientID/ClientSecret/TargetEmail
AWS SNS - sns://AccessKeyID/AccessSecretKey/RegionName/+PhoneNo
SMTPS - mailtos://user:pass@mail.domain.com?to=receivingAddress@example.com
">{{notification_urls}}</textarea>
</div>
<div class="pure-controls">
<span class="pure-form-message-inline"><label for="trigger-test-notification" class="pure-checkbox">
<input type="checkbox" id="trigger-test-notification" name="trigger-test-notification"> Send test notification on save.</label></span>
</div>
<br/>
<div class="pure-control-group">
<button type="submit" class="pure-button pure-button-primary">Save</button>

@ -35,13 +35,12 @@ def app(request):
def teardown():
datastore.stop_thread = True
app.config.exit.set()
try:
os.unlink("{}/url-watches.json".format(datastore_path))
except FileNotFoundError:
# This is fine in the case of a failure.
pass
assert 1 == 1
for fname in ["url-watches.json", "count.txt", "output.txt"]:
try:
os.unlink("{}/{}".format(datastore_path, fname))
except FileNotFoundError:
# This is fine in the case of a failure.
pass
request.addfinalizer(teardown)
yield app

@ -0,0 +1,58 @@
from flask import url_for
def test_check_access_control(app, client):
# Still doesnt work, but this is closer.
return
with app.test_client() as c:
# Check we dont have any password protection enabled yet.
res = c.get(url_for("settings_page"))
assert b"Remove password" not in res.data
# Enable password check.
res = c.post(
url_for("settings_page"),
data={"password": "foobar"},
follow_redirects=True
)
assert b"Password protection enabled." in res.data
assert b"LOG OUT" not in res.data
print ("SESSION:", res.session)
# Check we hit the login
res = c.get(url_for("settings_page"), follow_redirects=True)
res = c.get(url_for("login"), follow_redirects=True)
assert b"Login" in res.data
print ("DEBUG >>>>>",res.data)
# Menu should not be available yet
assert b"SETTINGS" not in res.data
assert b"BACKUP" not in res.data
assert b"IMPORT" not in res.data
#defaultuser@changedetection.io is actually hardcoded for now, we only use a single password
res = c.post(
url_for("login"),
data={"password": "foobar", "email": "defaultuser@changedetection.io"},
follow_redirects=True
)
assert b"LOG OUT" in res.data
res = c.get(url_for("settings_page"))
# Menu should be available now
assert b"SETTINGS" in res.data
assert b"BACKUP" in res.data
assert b"IMPORT" in res.data
assert b"LOG OUT" in res.data
# Now remove the password so other tests function, @todo this should happen before each test automatically
c.get(url_for("settings_page", removepassword="true"))
c.get(url_for("import_page"))
assert b"LOG OUT" not in res.data

@ -3,55 +3,16 @@
import time
from flask import url_for
from urllib.request import urlopen
import pytest
from . util import set_original_response, set_modified_response, live_server_setup
sleep_time_for_fetch_thread = 3
def test_setup_liveserver(live_server):
@live_server.app.route('/test-endpoint')
def test_endpoint():
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/output.txt", "r") as f:
return f.read()
live_server.start()
assert 1 == 1
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
def set_modified_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>which has this one new line</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
def test_check_basic_change_detection_functionality(client, live_server):
set_original_response()
live_server_setup(live_server)
# Add our URL to the import page
res = client.post(
@ -128,59 +89,3 @@ def test_check_basic_change_detection_functionality(client, live_server):
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_check_access_control(app, client):
# Still doesnt work, but this is closer.
return
with app.test_client() as c:
# Check we dont have any password protection enabled yet.
res = c.get(url_for("settings_page"))
assert b"Remove password" not in res.data
# Enable password check.
res = c.post(
url_for("settings_page"),
data={"password": "foobar"},
follow_redirects=True
)
assert b"Password protection enabled." in res.data
assert b"LOG OUT" not in res.data
print ("SESSION:", res.session)
# Check we hit the login
res = c.get(url_for("settings_page"), follow_redirects=True)
res = c.get(url_for("login"), follow_redirects=True)
assert b"Login" in res.data
print ("DEBUG >>>>>",res.data)
# Menu should not be available yet
assert b"SETTINGS" not in res.data
assert b"BACKUP" not in res.data
assert b"IMPORT" not in res.data
#defaultuser@changedetection.io is actually hardcoded for now, we only use a single password
res = c.post(
url_for("login"),
data={"password": "foobar", "email": "defaultuser@changedetection.io"},
follow_redirects=True
)
assert b"LOG OUT" in res.data
res = c.get(url_for("settings_page"))
# Menu should be available now
assert b"SETTINGS" in res.data
assert b"BACKUP" in res.data
assert b"IMPORT" in res.data
assert b"LOG OUT" in res.data
# Now remove the password so other tests function, @todo this should happen before each test automatically
c.get(url_for("settings_page", removepassword="true"))
c.get(url_for("import_page"))
assert b"LOG OUT" not in res.data

@ -2,9 +2,10 @@
import time
from flask import url_for
from urllib.request import urlopen
import pytest
from . util import live_server_setup
def test_setup(live_server):
live_server_setup(live_server)
# Unit test of the stripper
# Always we are dealing in utf-8

@ -0,0 +1,66 @@
import time
from flask import url_for
from . util import set_original_response, set_modified_response, live_server_setup
# Hard to just add more live server URLs when one test is already running (I think)
# So we add our test here (was in a different file)
def test_check_notification(client, live_server):
live_server_setup(live_server)
set_original_response()
# Give the endpoint time to spin up
time.sleep(3)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Give the thread time to pick it up
time.sleep(3)
# Goto the edit page, add our ignore text
# Add our URL to the import page
url = url_for('test_notification_endpoint', _external=True)
notification_url = url.replace('http', 'json')
print (">>>> Notification URL: "+notification_url)
res = client.post(
url_for("edit_page", uuid="first"),
data={"notification_urls": notification_url, "url": test_url, "tag": "", "headers": ""},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Hit the edit page, be sure that we saved it
res = client.get(
url_for("edit_page", uuid="first"))
assert bytes(notification_url.encode('utf-8')) in res.data
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
# Did the front end see it?
res = client.get(
url_for("index"))
assert bytes("just now".encode('utf-8')) in res.data
# Check it triggered
res = client.get(
url_for("test_notification_counter"),
)
print (res.data)
assert bytes("we hit it".encode('utf-8')) in res.data

@ -0,0 +1,60 @@
#!/usr/bin/python3
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
return None
def set_modified_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>which has this one new line</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
return None
def live_server_setup(live_server):
@live_server.app.route('/test-endpoint')
def test_endpoint():
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/output.txt", "r") as f:
return f.read()
@live_server.app.route('/test_notification_endpoint', methods=['POST'])
def test_notification_endpoint():
with open("test-datastore/count.txt", "w") as f:
f.write("we hit it")
print("\n>> Test notification endpoint was hit.\n")
return "Text was set"
# And this should return not zero.
@live_server.app.route('/test_notification_counter')
def test_notification_counter():
try:
with open("test-datastore/count.txt", "r") as f:
return f.read()
except FileNotFoundError:
return "nope :("
live_server.start()

@ -0,0 +1,67 @@
import threading
import queue
# Requests for checking on the site use a pool of thread Workers managed by a Queue.
class update_worker(threading.Thread):
current_uuid = None
def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
self.q = q
self.app = app
self.notification_q = notification_q
self.datastore = datastore
super().__init__(*args, **kwargs)
def run(self):
from backend import fetch_site_status
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
while not self.app.config.exit.is_set():
try:
uuid = self.q.get(block=False)
except queue.Empty:
pass
else:
self.current_uuid = uuid
if uuid in list(self.datastore.data['watching'].keys()):
try:
changed_detected, result, contents = update_handler.run(uuid)
except PermissionError as s:
self.app.logger.error("File permission error updating", uuid, str(s))
else:
if result:
try:
self.datastore.update_watch(uuid=uuid, update_obj=result)
if changed_detected:
# A change was detected
self.datastore.save_history_text(uuid=uuid, contents=contents, result_obj=result)
watch = self.datastore.data['watching'][uuid]
# Did it have any notification alerts to hit?
if len(watch['notification_urls']):
print("Processing notifications for UUID: {}".format(uuid))
n_object = {'watch_url': self.datastore.data['watching'][uuid]['url'],
'notification_urls': watch['notification_urls']}
self.notification_q.put(n_object)
# No? maybe theres a global setting, queue them all
elif len(self.datastore.data['settings']['application']['notification_urls']):
print("Processing GLOBAL notifications for UUID: {}".format(uuid))
n_object = {'watch_url': self.datastore.data['watching'][uuid]['url'],
'notification_urls': self.datastore.data['settings']['application'][
'notification_urls']}
self.notification_q.put(n_object)
except Exception as e:
print("!!!! Exception in update_worker !!!\n", e)
self.current_uuid = None # Done
self.q.task_done()
self.app.config.exit.wait(1)

@ -1,8 +1,8 @@
chardet==2.3.0
flask~= 1.0
pytest ~=6.2
pytest-flask ~=1.1
eventlet>=0.31.0
pytest-flask ~=1.2
eventlet ~= 0.30
requests ~= 2.15
validators
timeago ~=1.0
@ -10,4 +10,5 @@ inscriptis ~= 1.1
feedgen ~= 0.9
flask-login ~= 0.5
pytz
urllib3
urllib3
apprise ~= 0.9

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

Loading…
Cancel
Save