API Interface (#617)

pull/620/head
dgtlmoon 3 years ago committed by GitHub
parent 2c834cfe37
commit 07e279b38d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -36,9 +36,12 @@ from flask import (
url_for,
)
from flask_login import login_required
from flask_restful import abort, Api
from flask_wtf import CSRFProtect
from changedetectionio import html_tools
from changedetectionio.api import api_v1
__version__ = '0.39.13.1'
@ -78,6 +81,8 @@ csrf.init_app(app)
notification_debug_log=[]
watch_api = Api(app, decorators=[csrf.exempt])
def init_app_secret(datastore_path):
secret = ""
@ -179,6 +184,25 @@ def changedetection_app(config=None, datastore_o=None):
login_manager.login_view = 'login'
app.secret_key = init_app_secret(config['datastore_path'])
watch_api.add_resource(api_v1.WatchSingleHistory,
'/api/v1/watch/<string:uuid>/history/<string:timestamp>',
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
watch_api.add_resource(api_v1.WatchHistory,
'/api/v1/watch/<string:uuid>/history',
resource_class_kwargs={'datastore': datastore})
watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch',
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>',
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
# Setup cors headers to allow all domains
# https://flask-cors.readthedocs.io/en/latest/
# CORS(app)
@ -367,6 +391,8 @@ def changedetection_app(config=None, datastore_o=None):
if limit_tag != None:
# Support for comma separated list of tags.
if watch['tag'] is None:
continue
for tag_in_watch in watch['tag'].split(','):
tag_in_watch = tag_in_watch.strip()
if tag_in_watch == limit_tag:
@ -671,6 +697,7 @@ def changedetection_app(config=None, datastore_o=None):
form=form,
current_base_url = datastore.data['settings']['application']['base_url'],
hide_remove_pass=os.getenv("SALTED_PASS", False),
api_key=datastore.data['settings']['application'].get('api_access_token'),
emailprefix=os.getenv('NOTIFICATION_MAIL_BUTTON_PREFIX', False))
return output
@ -869,27 +896,6 @@ def changedetection_app(config=None, datastore_o=None):
return output
@app.route("/api/<string:uuid>/snapshot/current", methods=['GET'])
@login_required
def api_snapshot(uuid):
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
return abort(400, "No history found for the specified link, bad link?")
newest = list(watch['history'].keys())[-1]
with open(watch['history'][newest], 'r') as f:
content = f.read()
resp = make_response(content)
resp.headers['Content-Type'] = 'text/plain'
return resp
@app.route("/favicon.ico", methods=['GET'])
def favicon():
return send_from_directory("static/images", path="favicon.ico")
@ -1000,7 +1006,7 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/api/add", methods=['POST'])
@login_required
def api_watch_add():
def form_watch_add():
from changedetectionio import forms
form = forms.quickWatchForm(request.form)
@ -1026,7 +1032,7 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/api/delete", methods=['GET'])
@login_required
def api_delete():
def form_delete():
uuid = request.args.get('uuid')
if uuid != 'all' and not uuid in datastore.data['watching'].keys():
@ -1043,7 +1049,7 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/api/clone", methods=['GET'])
@login_required
def api_clone():
def form_clone():
uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first':
@ -1057,7 +1063,7 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/api/checknow", methods=['GET'])
@login_required
def api_watch_checknow():
def form_watch_checknow():
tag = request.args.get('tag')
uuid = request.args.get('uuid')
@ -1094,7 +1100,7 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/api/share-url", methods=['GET'])
@login_required
def api_share_put_watch():
def form_share_put_watch():
"""Given a watch UUID, upload the info and return a share-link
the share-link can be imported/added"""
import requests

@ -0,0 +1,125 @@
from flask_restful import abort, Resource
from flask import request, make_response
import validators
from . import auth
# https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html
class Watch(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
self.datastore = kwargs['datastore']
self.update_q = kwargs['update_q']
# Get information about a single watch, excluding the history list (can be large)
# curl http://localhost:4000/api/v1/watch/<string:uuid>
# ?recheck=true
@auth.check_token
def get(self, uuid):
from copy import deepcopy
watch = deepcopy(self.datastore.data['watching'].get(uuid))
if not watch:
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if request.args.get('recheck'):
self.update_q.put(uuid)
return "OK", 200
# Return without history, get that via another API call
watch['history_n'] = len(watch['history'])
del (watch['history'])
return watch
@auth.check_token
def delete(self, uuid):
if not self.datastore.data['watching'].get(uuid):
abort(400, message='No watch exists with the UUID of {}'.format(uuid))
self.datastore.delete(uuid)
return 'OK', 204
class WatchHistory(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
self.datastore = kwargs['datastore']
# Get a list of available history for a watch by UUID
# curl http://localhost:4000/api/v1/watch/<string:uuid>/history
def get(self, uuid):
watch = self.datastore.data['watching'].get(uuid)
if not watch:
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
return watch['history'], 200
class WatchSingleHistory(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
self.datastore = kwargs['datastore']
# Read a given history snapshot and return its content
# <string:timestamp> or "latest"
# curl http://localhost:4000/api/v1/watch/<string:uuid>/history/<int:timestamp>
@auth.check_token
def get(self, uuid, timestamp):
watch = self.datastore.data['watching'].get(uuid)
if not watch:
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if not len(watch['history']):
abort(404, message='Watch found but no history exists for the UUID {}'.format(uuid))
if timestamp == 'latest':
timestamp = list(watch['history'].keys())[-1]
with open(watch['history'][timestamp], 'r') as f:
content = f.read()
response = make_response(content, 200)
response.mimetype = "text/plain"
return response
class CreateWatch(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
self.datastore = kwargs['datastore']
self.update_q = kwargs['update_q']
@auth.check_token
def post(self):
# curl http://localhost:4000/api/v1/watch -H "Content-Type: application/json" -d '{"url": "https://my-nice.com", "tag": "one, two" }'
json_data = request.get_json()
tag = json_data['tag'].strip() if json_data.get('tag') else ''
if not validators.url(json_data['url'].strip()):
return "Invalid or unsupported URL", 400
extras = {'title': json_data['title'].strip()} if json_data.get('title') else {}
new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras)
self.update_q.put(new_uuid)
return {'uuid': new_uuid}, 201
# Return concise list of available watches and some very basic info
# curl http://localhost:4000/api/v1/watch|python -mjson.tool
# ?recheck_all=1 to recheck all
@auth.check_token
def get(self):
list = {}
for k, v in self.datastore.data['watching'].items():
list[k] = {'url': v['url'],
'title': v['title'],
'last_checked': v['last_checked'],
'last_changed': v['last_changed'],
'last_error': v['last_error']}
if request.args.get('recheck_all'):
for uuid in self.datastore.data['watching'].keys():
self.update_q.put(uuid)
return {'status': "OK"}, 200
return list, 200

@ -0,0 +1,33 @@
from flask import request, make_response, jsonify
from functools import wraps
# Simple API auth key comparison
# @todo - Maybe short lived token in the future?
def check_token(f):
@wraps(f)
def decorated(*args, **kwargs):
datastore = args[0].datastore
config_api_token_enabled = datastore.data['settings']['application'].get('api_access_token_enabled')
if not config_api_token_enabled:
return
try:
api_key_header = request.headers['x-api-key']
except KeyError:
return make_response(
jsonify("No authorization x-api-key header."), 403
)
config_api_token = datastore.data['settings']['application'].get('api_access_token')
if api_key_header != config_api_token:
return make_response(
jsonify("Invalid access - API key invalid."), 403
)
return f(*args, **kwargs)
return decorated

@ -374,6 +374,7 @@ class globalSettingsApplicationForm(commonSettingsForm):
empty_pages_are_a_change = BooleanField('Treat empty pages as a change?', default=False)
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
fetch_backend = RadioField('Fetch Method', default="html_requests", choices=content_fetcher.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
api_access_token_enabled = BooleanField('API access token security check enabled', default=True, validators=[validators.Optional()])
password = SaltyPasswordField()

@ -27,6 +27,7 @@ class model(dict):
'proxy': None # Preferred proxy connection
},
'application': {
'api_access_token_enabled': True,
'password': False,
'base_url' : None,
'extract_title_as_title': False,

@ -8,9 +8,29 @@ $(document).ready(function() {
$('#webdriver-override-options').hide();
}
}
$('input[name="application-fetch_backend"]').click(function (e) {
toggle();
});
toggle();
$("#api-key").hover(
function () {
$("#api-key-copy").html('copy').fadeIn();
},
function () {
$("#api-key-copy").hide();
}
).click(function (e) {
$("#api-key-copy").html('copied');
var range = document.createRange();
var n = $("#api-key")[0];
range.selectNode(n);
window.getSelection().removeAllRanges();
window.getSelection().addRange(range);
document.execCommand("copy");
window.getSelection().removeAllRanges();
});
});

@ -456,3 +456,9 @@ ul {
#webdriver-override-options input[type="number"] {
width: 5em; }
#api-key:hover {
cursor: pointer; }
#api-key-copy {
color: #0078e7; }

@ -654,3 +654,13 @@ ul {
width: 5em;
}
}
#api-key {
&:hover {
cursor: pointer;
}
}
#api-key-copy {
color: #0078e7;
}

@ -12,6 +12,7 @@ from os import mkdir, path, unlink
from threading import Lock
import re
import requests
import secrets
from . model import App, Watch
@ -107,10 +108,13 @@ class ChangeDetectionStore:
# Generate the URL access token for RSS feeds
if not 'rss_access_token' in self.__data['settings']['application']:
import secrets
secret = secrets.token_hex(16)
self.__data['settings']['application']['rss_access_token'] = secret
# Generate the API access token
if not 'api_access_token' in self.__data['settings']['application']:
secret = secrets.token_hex(16)
self.__data['settings']['application']['api_access_token'] = secret
# Proxy list support - available as a selection in settings when text file is imported
# CSV list
@ -211,7 +215,8 @@ class ChangeDetectionStore:
def get_all_tags(self):
tags = []
for uuid, watch in self.data['watching'].items():
if watch['tag'] is None:
continue
# Support for comma separated list of tags.
for tag in watch['tag'].split(','):
tag = tag.strip()
@ -280,6 +285,10 @@ class ChangeDetectionStore:
def add_watch(self, url, tag="", extras=None, write_to_disk_now=True):
if extras is None:
extras = {}
# should always be str
if tag is None or not tag:
tag=''
# Incase these are copied across, assume it's a reference and deepcopy()
apply_extras = deepcopy(extras)

@ -199,9 +199,9 @@ nav
{{ render_button(form.save_button) }} {{ render_button(form.save_and_preview_button) }}
<a href="{{url_for('api_delete', uuid=uuid)}}"
<a href="{{url_for('form_delete', uuid=uuid)}}"
class="pure-button button-small button-error ">Delete</a>
<a href="{{url_for('api_clone', uuid=uuid)}}"
<a href="{{url_for('form_clone', uuid=uuid)}}"
class="pure-button button-small ">Create Copy</a>
</div>
</div>

@ -20,6 +20,7 @@
<li class="tab"><a href="#notifications">Notifications</a></li>
<li class="tab"><a href="#fetching">Fetching</a></li>
<li class="tab"><a href="#filters">Global Filters</a></li>
<li class="tab"><a href="#api">API</a></li>
</ul>
</div>
<div class="box-wrap inner">
@ -43,6 +44,7 @@
<span class="pure-form-message-inline">Password is locked.</span>
{% endif %}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.base_url, placeholder="http://yoursite.com:5000/",
class="m-d") }}
@ -105,7 +107,6 @@
</fieldset>
</div>
<div class="tab-pane-inner" id="filters">
<fieldset class="pure-group">
@ -150,12 +151,26 @@ nav
</fieldset>
</div>
<div class="tab-pane-inner" id="api">
<p>Drive your changedetection.io via API, More about <a href="https://github.com/dgtlmoon/changedetection.io/wiki/API-Reference">API access here</a></p>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.api_access_token_enabled) }}
<div class="pure-form-message-inline">Restrict API access limit by using <code>x-api-key</code> header</div><br/>
<div class="pure-form-message-inline"><br/>API Key <span id="api-key">{{api_key}}</span>
<span style="display:none;" id="api-key-copy" >copy</span>
</div>
</div>
</div>
<div id="actions">
<div class="pure-control-group">
{{ render_button(form.save_button) }}
<a href="{{url_for('index')}}" class="pure-button button-small button-cancel">Back</a>
<a href="{{url_for('scrub_page')}}" class="pure-button button-small button-cancel">Delete History Snapshot Data</a>
</div>
</div>
</form>
</div>

@ -5,7 +5,7 @@
<script type="text/javascript" src="{{url_for('static_content', group='js', filename='watch-overview.js')}}" defer></script>
<div class="box">
<form class="pure-form" action="{{ url_for('api_watch_add') }}" method="POST" id="new-watch-form">
<form class="pure-form" action="{{ url_for('form_watch_add') }}" method="POST" id="new-watch-form">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<fieldset>
<legend>Add a new change detection watch</legend>
@ -52,7 +52,7 @@
<td class="title-col inline">{{watch.title if watch.title is not none and watch.title|length > 0 else watch.url}}
<a class="external" target="_blank" rel="noopener" href="{{ watch.url.replace('source:','') }}"></a>
<a href="{{url_for('api_share_put_watch', uuid=watch.uuid)}}"><img style="height: 1em;display:inline-block;" src="{{url_for('static_content', group='images', filename='spread.svg')}}" /></a>
<a href="{{url_for('form_share_put_watch', uuid=watch.uuid)}}"><img style="height: 1em;display:inline-block;" src="{{url_for('static_content', group='images', filename='spread.svg')}}" /></a>
{%if watch.fetch_backend == "html_webdriver" %}<img style="height: 1em; display:inline-block;" src="{{url_for('static_content', group='images', filename='Google-Chrome-icon.png')}}" />{% endif %}
@ -74,7 +74,7 @@
{% endif %}
</td>
<td>
<a {% if watch.uuid in queued_uuids %}disabled="true"{% endif %} href="{{ url_for('api_watch_checknow', uuid=watch.uuid, tag=request.args.get('tag')) }}"
<a {% if watch.uuid in queued_uuids %}disabled="true"{% endif %} href="{{ url_for('form_watch_checknow', uuid=watch.uuid, tag=request.args.get('tag')) }}"
class="recheck pure-button button-small pure-button-primary">{% if watch.uuid in queued_uuids %}Queued{% else %}Recheck{% endif %}</a>
<a href="{{ url_for('edit_page', uuid=watch.uuid)}}" class="pure-button button-small pure-button-primary">Edit</a>
{% if watch.history|length >= 2 %}
@ -96,7 +96,7 @@
</li>
{% endif %}
<li>
<a href="{{ url_for('api_watch_checknow', tag=active_tag) }}" class="pure-button button-tag ">Recheck
<a href="{{ url_for('form_watch_checknow', tag=active_tag) }}" class="pure-button button-tag ">Recheck
all {% if active_tag%}in "{{active_tag}}"{%endif%}</a>
</li>
<li>

@ -4,71 +4,203 @@ import time
from flask import url_for
from .util import live_server_setup
def test_setup(live_server):
live_server_setup(live_server)
import json
import uuid
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
<div id="sametext">Some text thats the same</div>
<div id="changetext">Some text that will change</div>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_modified_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>which has this one new line</p>
</br>
So let's see what happens. </br>
<div id="sametext">Some text thats the same</div>
<div id="changetext">Some text that changes</div>
</body>
</html>
"""
def set_response_data(test_return_data):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def is_valid_uuid(val):
try:
uuid.UUID(str(val))
return True
except ValueError:
return False
def test_snapshot_api_detects_change(client, live_server):
test_return_data = "Some initial text"
test_return_data_modified = "Some NEW nice initial text"
# kinda funky, but works for now
def _extract_api_key_from_UI(client):
import re
res = client.get(
url_for("settings_page"),
)
# <span id="api-key">{{api_key}}</span>
m = re.search('<span id="api-key">(.+?)</span>', str(res.data))
api_key = m.group(1)
return api_key.strip()
sleep_time_for_fetch_thread = 3
set_response_data(test_return_data)
def test_api_simple(client, live_server):
live_server_setup(live_server)
api_key = _extract_api_key_from_UI(client)
# Give the endpoint time to spin up
time.sleep(1)
# Create a watch
set_original_response()
watch_uuid = None
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="text/plain",
_external=True)
# Validate bad URL
test_url = url_for('test_endpoint', _external=True,
headers={'x-api-key': api_key}, )
res = client.post(
url_for("import_page"),
data={"urls": test_url},
url_for("createwatch"),
data=json.dumps({"url": "h://xxxxxxxxxom"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert b"1 Imported" in res.data
assert res.status_code == 400
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Create new
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, 'tag': "One, Two", "title": "My test URL"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
s = json.loads(res.data)
assert is_valid_uuid(s['uuid'])
watch_uuid = s['uuid']
assert res.status_code == 201
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
time.sleep(3)
# Verify its in the list and that recheck worked
res = client.get(
url_for("api_snapshot", uuid="first"),
follow_redirects=True
url_for("createwatch"),
headers={'x-api-key': api_key}
)
assert watch_uuid in json.loads(res.data).keys()
before_recheck_info = json.loads(res.data)[watch_uuid]
assert before_recheck_info['last_checked'] != 0
assert before_recheck_info['title'] == 'My test URL'
set_modified_response()
# Trigger recheck of all ?recheck_all=1
client.get(
url_for("createwatch", recheck_all='1'),
headers={'x-api-key': api_key},
)
time.sleep(3)
# Did the recheck fire?
res = client.get(
url_for("createwatch"),
headers={'x-api-key': api_key},
)
after_recheck_info = json.loads(res.data)[watch_uuid]
assert after_recheck_info['last_checked'] != before_recheck_info['last_checked']
assert after_recheck_info['last_changed'] != 0
assert test_return_data.encode() == res.data
# Check history index list
res = client.get(
url_for("watchhistory", uuid=watch_uuid),
headers={'x-api-key': api_key},
)
history = json.loads(res.data)
assert len(history) == 2, "Should have two history entries (the original and the changed)"
# Make a change
set_response_data(test_return_data_modified)
# Fetch a snapshot by timestamp, check the right one was found
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp=list(history.keys())[-1]),
headers={'x-api-key': api_key},
)
assert b'which has this one new line' in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Fetch a snapshot by 'latest'', check the right one was found
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp='latest'),
headers={'x-api-key': api_key},
)
assert b'which has this one new line' in res.data
# Fetch the whole watch
res = client.get(
url_for("api_snapshot", uuid="first"),
follow_redirects=True
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
watch = json.loads(res.data)
# @todo how to handle None/default global values?
assert watch['history_n'] == 2, "Found replacement history section, which is in its own API"
# Finally delete the watch
res = client.delete(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key},
)
assert res.status_code == 204
assert test_return_data_modified.encode() == res.data
# Check via a relist
res = client.get(
url_for("createwatch"),
headers={'x-api-key': api_key}
)
watch_list = json.loads(res.data)
assert len(watch_list) == 0, "Watch list should be empty"
def test_snapshot_api_invalid_uuid(client, live_server):
def test_access_denied(client, live_server):
# `config_api_token_enabled` Should be On by default
res = client.get(
url_for("api_snapshot", uuid="invalid"),
url_for("createwatch")
)
assert res.status_code == 403
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "something horrible"}
)
assert res.status_code == 403
# Disable config_api_token_enabled and it should work
res = client.post(
url_for("settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-fetch_backend": "html_requests",
"application-api_access_token_enabled": ""
},
follow_redirects=True
)
assert res.status_code == 400
assert b"Settings updated." in res.data
res = client.get(
url_for("createwatch")
)
assert res.status_code == 200

@ -29,7 +29,7 @@ def test_basic_auth(client, live_server):
assert b"Updated watch." in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(1)
res = client.get(
url_for("preview_page", uuid="first"),

@ -32,7 +32,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
# Do this a few times.. ensures we dont accidently set the status
for n in range(3):
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -65,7 +65,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
assert b'which has this one new line' in res.read()
# Force recheck
res = client.get(url_for("api_watch_checknow"), follow_redirects=True)
res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
assert b'1 watches are queued for rechecking.' in res.data
time.sleep(sleep_time_for_fetch_thread)
@ -93,7 +93,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
# Do this a few times.. ensures we dont accidently set the status
for n in range(2):
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -113,7 +113,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
follow_redirects=True
)
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
@ -123,6 +123,6 @@ def test_check_basic_change_detection_functionality(client, live_server):
#
# Cleanup everything
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

@ -23,7 +23,7 @@ def test_trigger_functionality(client, live_server):
res = client.get(
url_for("api_clone", uuid="first"),
url_for("form_clone", uuid="first"),
follow_redirects=True
)

@ -89,7 +89,7 @@ def test_check_markup_css_filter_restriction(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -110,7 +110,7 @@ def test_check_markup_css_filter_restriction(client, live_server):
assert bytes(css_filter.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -118,7 +118,7 @@ def test_check_markup_css_filter_restriction(client, live_server):
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

@ -145,7 +145,7 @@ def test_element_removal_full(client, live_server):
assert bytes(subtractive_selectors_data.encode("utf-8")) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -158,7 +158,7 @@ def test_element_removal_full(client, live_server):
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

@ -39,7 +39,7 @@ def test_check_encoding_detection(client, live_server):
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(2)
@ -71,7 +71,7 @@ def test_check_encoding_detection_missing_content_type_header(client, live_serve
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(2)

@ -29,7 +29,7 @@ def test_error_handler(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -54,7 +54,7 @@ def test_error_text_handler(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)

@ -102,7 +102,7 @@ def test_check_ignore_text_functionality(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -123,7 +123,7 @@ def test_check_ignore_text_functionality(client, live_server):
assert bytes(ignore_text.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -137,7 +137,7 @@ def test_check_ignore_text_functionality(client, live_server):
set_modified_ignore_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -152,7 +152,7 @@ def test_check_ignore_text_functionality(client, live_server):
# Just to be sure.. set a regular modified change..
set_modified_original_ignore_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
@ -165,7 +165,7 @@ def test_check_ignore_text_functionality(client, live_server):
# We should be able to see what we ignored
assert b'<div class="ignored">new ignore stuff' in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_check_global_ignore_text_functionality(client, live_server):
@ -200,7 +200,7 @@ def test_check_global_ignore_text_functionality(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -222,7 +222,7 @@ def test_check_global_ignore_text_functionality(client, live_server):
assert bytes(ignore_text.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -240,7 +240,7 @@ def test_check_global_ignore_text_functionality(client, live_server):
set_modified_ignore_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -251,10 +251,10 @@ def test_check_global_ignore_text_functionality(client, live_server):
# Just to be sure.. set a regular modified change that will trigger it
set_modified_original_ignore_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

@ -72,14 +72,14 @@ def test_render_anchor_tag_content_true(client, live_server):
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# set a new html text with a modified link
set_modified_ignore_response()
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -101,7 +101,7 @@ def test_render_anchor_tag_content_true(client, live_server):
assert b"Settings updated." in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -119,7 +119,7 @@ def test_render_anchor_tag_content_true(client, live_server):
assert b"/test-endpoint" in res.data
# Cleanup everything
res = client.get(url_for("api_delete", uuid="all"),
res = client.get(url_for("form_delete", uuid="all"),
follow_redirects=True)
assert b'Deleted' in res.data

@ -70,12 +70,12 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server):
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
set_some_changed_response()
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -105,7 +105,7 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -120,7 +120,7 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server):
assert b"Updated watch." in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -128,7 +128,7 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server):
set_some_changed_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -157,7 +157,7 @@ def test_403_page_check_fails_without_ignore_status_code(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -172,7 +172,7 @@ def test_403_page_check_fails_without_ignore_status_code(client, live_server):
assert b"Updated watch." in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -180,7 +180,7 @@ def test_403_page_check_fails_without_ignore_status_code(client, live_server):
set_some_changed_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

@ -80,12 +80,12 @@ def test_check_ignore_whitespace(client, live_server):
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
set_original_ignore_response_but_with_whitespace()
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

@ -25,7 +25,7 @@ https://example.com tag1, other tag"""
assert b"3 Imported" in res.data
assert b"tag1" in res.data
assert b"other tag" in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
# Clear flask alerts
res = client.get( url_for("index"))
@ -50,7 +50,7 @@ def xtest_import_skip_url(client, live_server):
assert b"1 Imported" in res.data
assert b"ht000000broken" in res.data
assert b"1 Skipped" in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
# Clear flask alerts
res = client.get( url_for("index"))
@ -79,7 +79,7 @@ def test_import_distillio(client, live_server):
# Give the endpoint time to spin up
time.sleep(1)
client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
res = client.post(
url_for("import_page"),
data={
@ -115,6 +115,6 @@ def test_import_distillio(client, live_server):
assert b"nice stuff" in res.data
assert b"nerd-news" in res.data
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
# Clear flask alerts
res = client.get(url_for("index"))

@ -171,7 +171,7 @@ def test_check_json_without_filter(client, live_server):
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -203,7 +203,7 @@ def test_check_json_filter(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -229,7 +229,7 @@ def test_check_json_filter(client, live_server):
assert bytes(json_filter.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -237,7 +237,7 @@ def test_check_json_filter(client, live_server):
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(4)
@ -288,7 +288,7 @@ def test_check_json_filter_bool_val(client, live_server):
time.sleep(3)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -296,7 +296,7 @@ def test_check_json_filter_bool_val(client, live_server):
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -327,7 +327,7 @@ def test_check_json_ext_filter(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -353,7 +353,7 @@ def test_check_json_ext_filter(client, live_server):
assert bytes(json_filter.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)
@ -361,7 +361,7 @@ def test_check_json_ext_filter(client, live_server):
set_modified_ext_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(4)

@ -39,7 +39,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
# Do this a few times.. ensures we dont accidently set the status
for n in range(3):
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -61,7 +61,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
# this should not trigger a change, because no good text could be converted from the HTML
set_nonrenderable_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -83,7 +83,7 @@ def test_check_basic_change_detection_functionality(client, live_server):
set_modified_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -97,6 +97,6 @@ def test_check_basic_change_detection_functionality(client, live_server):
#
# Cleanup everything
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

@ -36,7 +36,7 @@ def test_check_notification(client, live_server):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("api_watch_add"),
url_for("form_watch_add"),
data={"url": test_url, "tag": ''},
follow_redirects=True
)
@ -98,7 +98,7 @@ def test_check_notification(client, live_server):
notification_submission = None
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(3)
# Verify what was sent as a notification, this file should exist
with open("test-datastore/notification.txt", "r") as f:
@ -133,7 +133,7 @@ def test_check_notification(client, live_server):
# This should insert the {current_snapshot}
set_more_modified_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(3)
# Verify what was sent as a notification, this file should exist
with open("test-datastore/notification.txt", "r") as f:
@ -146,17 +146,17 @@ def test_check_notification(client, live_server):
os.unlink("test-datastore/notification.txt")
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(1)
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(1)
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(1)
assert os.path.exists("test-datastore/notification.txt") == False
# cleanup for the next
client.get(
url_for("api_delete", uuid="all"),
url_for("form_delete", uuid="all"),
follow_redirects=True
)
@ -168,7 +168,7 @@ def test_notification_validation(client, live_server):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("api_watch_add"),
url_for("form_watch_add"),
data={"url": test_url, "tag": 'nice one'},
follow_redirects=True
)
@ -208,6 +208,6 @@ def test_notification_validation(client, live_server):
# cleanup for the next
client.get(
url_for("api_delete", uuid="all"),
url_for("form_delete", uuid="all"),
follow_redirects=True
)

@ -16,7 +16,7 @@ def test_check_notification_error_handling(client, live_server):
# use a different URL so that it doesnt interfere with the actual check until we are ready
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("api_watch_add"),
url_for("form_watch_add"),
data={"url": "https://changedetection.io/CHANGELOG.txt", "tag": ''},
follow_redirects=True
)

@ -41,7 +41,7 @@ def test_share_watch(client, live_server):
# click share the link
res = client.get(
url_for("api_share_put_watch", uuid="first"),
url_for("form_share_put_watch", uuid="first"),
follow_redirects=True
)
@ -54,7 +54,7 @@ def test_share_watch(client, live_server):
# Now delete what we have, we will try to re-import it
# Cleanup everything
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
# Add our URL to the import page

@ -39,7 +39,7 @@ def test_check_basic_change_detection_functionality_source(client, live_server):
set_modified_response()
# Force recheck
res = client.get(url_for("api_watch_checknow"), follow_redirects=True)
res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
assert b'1 watches are queued for rechecking.' in res.data
time.sleep(5)

@ -76,7 +76,7 @@ def test_trigger_functionality(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -99,7 +99,7 @@ def test_trigger_functionality(client, live_server):
assert bytes(trigger_text.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -113,7 +113,7 @@ def test_trigger_functionality(client, live_server):
set_modified_original_ignore_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -125,7 +125,7 @@ def test_trigger_functionality(client, live_server):
time.sleep(sleep_time_for_fetch_thread)
set_modified_with_trigger_text_response()
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

@ -43,7 +43,7 @@ def test_trigger_regex_functionality(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -65,7 +65,7 @@ def test_trigger_regex_functionality(client, live_server):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("some new noise")
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (nothing should match the regex)
@ -75,7 +75,7 @@ def test_trigger_regex_functionality(client, live_server):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("regex test123<br/>\nsomething 123")
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

@ -43,7 +43,7 @@ def test_trigger_regex_functionality(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -66,7 +66,7 @@ def test_trigger_regex_functionality(client, live_server):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("<html>some new noise with cool stuff2 ok</html>")
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (nothing should match the regex and filter)
@ -76,7 +76,7 @@ def test_trigger_regex_functionality(client, live_server):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>")
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

@ -65,7 +65,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -89,7 +89,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@ -121,7 +121,7 @@ def test_xpath_validation(client, live_server):
# actually only really used by the distll.io importer, but could be handy too
def test_check_with_prefix_css_filter(client, live_server):
res = client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
# Give the endpoint time to spin up
@ -158,4 +158,4 @@ def test_check_with_prefix_css_filter(client, live_server):
assert b"Some text thats the same" in res.data #in selector
assert b"Some text that will change" not in res.data #not in selector
client.get(url_for("api_delete", uuid="all"), follow_redirects=True)
client.get(url_for("form_delete", uuid="all"), follow_redirects=True)

@ -6,6 +6,7 @@ timeago ~=1.0
inscriptis ~= 2.2
feedgen ~= 0.9
flask-login ~= 0.5
flask_restful
pytz
# Set these versions together to avoid a RequestsDependencyWarning

Loading…
Cancel
Save