Finalse pytest methods

pull/11/head
Leigh Morresi 4 years ago
parent b46a7fc4b1
commit 1718e2e86f

@ -0,0 +1 @@
Note: run `pytest` from this directory.

@ -91,7 +91,7 @@ def changedetection_app(config=None, datastore_o=None):
# You can divide up the stuff like this # You can divide up the stuff like this
@app.route("/", methods=['GET']) @app.route("/", methods=['GET'])
def main_page(): def index():
global messages global messages
limit_tag = request.args.get('tag') limit_tag = request.args.get('tag')
@ -152,7 +152,7 @@ def changedetection_app(config=None, datastore_o=None):
else: else:
messages.append({'class': 'error', 'message': 'Wrong confirm text.'}) messages.append({'class': 'error', 'message': 'Wrong confirm text.'})
return redirect(url_for('main_page')) return redirect(url_for('index'))
return render_template("scrub.html") return render_template("scrub.html")
@ -184,7 +184,7 @@ def changedetection_app(config=None, datastore_o=None):
messages.append({'class': 'ok', 'message': 'Updated watch.'}) messages.append({'class': 'ok', 'message': 'Updated watch.'})
return redirect(url_for('main_page')) return redirect(url_for('index'))
else: else:
@ -230,7 +230,9 @@ def changedetection_app(config=None, datastore_o=None):
for url in urls: for url in urls:
url = url.strip() url = url.strip()
if len(url) and validators.url(url): if len(url) and validators.url(url):
datastore.add_watch(url=url.strip(), tag="") new_uuid = datastore.add_watch(url=url.strip(), tag="")
# Straight into the queue.
update_q.put(new_uuid)
good += 1 good += 1
else: else:
if len(url): if len(url):
@ -239,7 +241,7 @@ def changedetection_app(config=None, datastore_o=None):
messages.append({'class': 'ok', 'message': "{} Imported, {} Skipped.".format(good, len(remaining_urls))}) messages.append({'class': 'ok', 'message': "{} Imported, {} Skipped.".format(good, len(remaining_urls))})
if len(remaining_urls) == 0: if len(remaining_urls) == 0:
return redirect(url_for('main_page')) return redirect(url_for('index'))
else: else:
output = render_template("import.html", output = render_template("import.html",
messages=messages, messages=messages,
@ -353,7 +355,7 @@ def changedetection_app(config=None, datastore_o=None):
update_q.put(new_uuid) update_q.put(new_uuid)
messages.append({'class': 'ok', 'message': 'Watch added.'}) messages.append({'class': 'ok', 'message': 'Watch added.'})
return redirect(url_for('main_page')) return redirect(url_for('index'))
@app.route("/api/delete", methods=['GET']) @app.route("/api/delete", methods=['GET'])
@ -363,7 +365,7 @@ def changedetection_app(config=None, datastore_o=None):
datastore.delete(uuid) datastore.delete(uuid)
messages.append({'class': 'ok', 'message': 'Deleted.'}) messages.append({'class': 'ok', 'message': 'Deleted.'})
return redirect(url_for('main_page')) return redirect(url_for('index'))
@app.route("/api/checknow", methods=['GET']) @app.route("/api/checknow", methods=['GET'])
@ -375,28 +377,34 @@ def changedetection_app(config=None, datastore_o=None):
uuid = request.args.get('uuid') uuid = request.args.get('uuid')
i=0 i=0
running_uuids=[]
for t in running_update_threads:
running_uuids.append(t.current_uuid)
# @todo check thread is running and skip
if uuid: if uuid:
if not uuid in running_uuids:
update_q.put(uuid) update_q.put(uuid)
i = 1 i = 1
elif tag != None: elif tag != None:
# Items that have this current tag
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if (tag != None and tag in watch['tag']): if (tag != None and tag in watch['tag']):
i += 1 i += 1
if not watch_uuid in running_uuids:
update_q.put(watch_uuid) update_q.put(watch_uuid)
else: else:
# No tag, no uuid, add everything. # No tag, no uuid, add everything.
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
i += 1 i += 1
if not watch_uuid in running_uuids:
update_q.put(watch_uuid) update_q.put(watch_uuid)
messages.append({'class': 'ok', 'message': "{} watches are rechecking.".format(i)}) messages.append({'class': 'ok', 'message': "{} watches are rechecking.".format(i)})
return redirect(url_for('main_page', tag=tag)) return redirect(url_for('index', tag=tag))
# for pytest flask
@app.route("/timestamp", methods=['GET'])
def api_test_rand_int():
return str(time.time())
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
@ -423,7 +431,7 @@ class Worker(threading.Thread):
while True: while True:
try: try:
uuid = self.q.get(block=True, timeout=1) # Blocking uuid = self.q.get(block=True, timeout=1)
except queue.Empty: except queue.Empty:
# We have a chance to kill this thread that needs to monitor for new jobs.. # We have a chance to kill this thread that needs to monitor for new jobs..
# Delays here would be caused by a current response object pending # Delays here would be caused by a current response object pending
@ -442,6 +450,8 @@ class Worker(threading.Thread):
app.logger.error("File permission error updating", uuid, str(s)) app.logger.error("File permission error updating", uuid, str(s))
else: else:
if result: if result:
result["previous_md5"] = result["current_md5"]
datastore.update_watch(uuid=uuid, update_obj=result) datastore.update_watch(uuid=uuid, update_obj=result)
if contents: if contents:
@ -468,13 +478,23 @@ def ticker_thread_check_time_launch_checks():
# Every minute check for new UUIDs to follow up on # Every minute check for new UUIDs to follow up on
while True: while True:
if app.config['STOP_THREADS']:
return
running_uuids=[]
for t in running_update_threads:
running_uuids.append(t.current_uuid)
# Look at the dataset, find a stale watch to process
minutes = datastore.data['settings']['requests']['minutes_between_check'] minutes = datastore.data['settings']['requests']['minutes_between_check']
for uuid, watch in datastore.data['watching'].items(): for uuid, watch in datastore.data['watching'].items():
if watch['last_checked'] <= time.time() - (minutes * 60): if watch['last_checked'] <= time.time() - (minutes * 60):
update_q.put(uuid)
if app.config['STOP_THREADS']: # @todo maybe update_q.queue is enough?
return if not uuid in running_uuids and uuid not in update_q.queue:
update_q.put(uuid)
# Should be low so we can break this out in testing
time.sleep(1) time.sleep(1)

@ -5,8 +5,6 @@ import os
import re import re
from inscriptis import get_text from inscriptis import get_text
from copy import deepcopy
# Some common stuff here that can be moved to a base class # Some common stuff here that can be moved to a base class
class perform_site_check(): class perform_site_check():
@ -17,7 +15,6 @@ class perform_site_check():
def run(self, uuid): def run(self, uuid):
timestamp = int(time.time()) # used for storage etc too timestamp = int(time.time()) # used for storage etc too
stripped_text_from_html = False stripped_text_from_html = False
@ -45,7 +42,9 @@ class perform_site_check():
timeout = 15 timeout = 15
try: try:
r = requests.get(self.datastore.get_val(uuid, 'url'), url = self.datastore.get_val(uuid, 'url')
r = requests.get(url,
headers=request_headers, headers=request_headers,
timeout=timeout, timeout=timeout,
verify=False) verify=False)
@ -53,7 +52,6 @@ class perform_site_check():
stripped_text_from_html = get_text(r.text) stripped_text_from_html = get_text(r.text)
# Usually from networkIO/requests level # Usually from networkIO/requests level
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e: except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
update_obj["last_error"] = str(e) update_obj["last_error"] = str(e)
@ -90,6 +88,7 @@ class perform_site_check():
if self.datastore.get_val(uuid, 'previous_md5'): if self.datastore.get_val(uuid, 'previous_md5'):
update_obj["last_changed"] = timestamp update_obj["last_changed"] = timestamp
update_obj["previous_md5"] = fetched_md5
update_obj["current_md5"] = fetched_md5
return update_obj, stripped_text_from_html return update_obj, stripped_text_from_html

@ -91,10 +91,10 @@ class ChangeDetectionStore:
# First time ran, doesnt exist. # First time ran, doesnt exist.
except (FileNotFoundError, json.decoder.JSONDecodeError): except (FileNotFoundError, json.decoder.JSONDecodeError):
print("Creating JSON store at", self.datastore_path) print("Creating JSON store at", self.datastore_path)
self.add_watch(url='http://www.quotationspage.com/random.php', tag='test') #self.add_watch(url='http://www.quotationspage.com/random.php', tag='test')
self.add_watch(url='https://news.ycombinator.com/', tag='Tech news') #self.add_watch(url='https://news.ycombinator.com/', tag='Tech news')
self.add_watch(url='https://www.gov.uk/coronavirus', tag='Covid') #self.add_watch(url='https://www.gov.uk/coronavirus', tag='Covid')
self.add_watch(url='https://changedetection.io', tag='Tech news') #self.add_watch(url='https://changedetection.io', tag='Tech news')
# Finally start the thread that will manage periodic data saves to JSON # Finally start the thread that will manage periodic data saves to JSON
save_data_thread = threading.Thread(target=self.save_datastore).start() save_data_thread = threading.Thread(target=self.save_datastore).start()
@ -198,7 +198,7 @@ class ChangeDetectionStore:
def save_history_text(self, uuid, result_obj, contents): def save_history_text(self, uuid, result_obj, contents):
output_path = "{}/{}".format(self.datastore_path, uuid) output_path = "{}/{}".format(self.datastore_path, uuid)
fname = "{}/{}.stripped.txt".format(output_path, result_obj['previous_md5']) fname = "{}/{}-{}.stripped.txt".format(output_path, result_obj['current_md5'], str(time.time()))
with open(fname, 'w') as f: with open(fname, 'w') as f:
f.write(contents) f.write(contents)
f.close() f.close()

@ -10,33 +10,39 @@ from backend import store
# Much better boilerplate than the docs # Much better boilerplate than the docs
# https://www.python-boilerplate.com/py3+flask+pytest/ # https://www.python-boilerplate.com/py3+flask+pytest/
global app
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def app(request): def app(request):
"""Create application for the tests.""" """Create application for the tests."""
datastore_path = "./test-datastore" datastore_path = "./test-datastore"
import os
try:
os.unlink("{}/url-watches.json".format(datastore_path))
except FileNotFoundError:
pass
app_config = {'datastore_path': datastore_path} app_config = {'datastore_path': datastore_path}
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path']) datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'])
_app = changedetection_app(app_config, datastore) app = changedetection_app(app_config, datastore)
# Establish an application context before running the tests. # Establish an application context before running the tests.
ctx = _app.app_context() #ctx = _app.app_context()
ctx.push() #ctx.push()
def teardown(): def teardown():
ctx.pop() datastore.stop_thread = True
app.config['STOP_THREADS']= True
request.addfinalizer(teardown) request.addfinalizer(teardown)
return _app return app
@pytest.fixture(scope='session') #@pytest.fixture(scope='session')
def client(app): #def client(app):
with app.test_client() as client: # with app.test_client() as client:
yield client # yield client
@pytest.fixture(scope='function')
def session(request):
"""Creates a new database session for a test."""
return session

@ -1,39 +1,98 @@
#!/usr/bin/python3 #!/usr/bin/python3
import pytest
import backend
from backend import store
import os
import time import time
import requests import pytest
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py from flask import url_for
from urllib.request import urlopen
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
def set_modified_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>which has this one new line</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
def test_add_endpoint_to_live_server(client, live_server):
sleep_time_for_fetch_thread = 3
@live_server.app.route('/test-endpoint')
def test_endpoint():
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/output.txt", "r") as f:
return f.read()
set_original_response()
live_server.start()
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'test-endpoint' in res.data
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("index"))
# Much better boilerplate than the docs assert b'unviewed' not in res.data
# https://www.python-boilerplate.com/py3+flask+pytest/
#####################
def test_import(client):
res = client.get("/")
assert b"IMPORT" in res.data
assert res.status_code == 200
test_url_list = ["https://slashdot.org"] # Make a change
res = client.post('/import', data={'urls': "\n".join(test_url_list)}, follow_redirects=True) set_modified_response()
s = "{} Imported".format(len(test_url_list))
#p= url_for('test_endpoint', _external=True res = urlopen(url_for('test_endpoint', _external=True))
assert b'which has this one new line' in res.read()
assert bytes(s.encode('utf-8')) in res.data
for url in test_url_list: # Force recheck
assert bytes(url.encode('utf-8')) in res.data res = client.get(url_for("api_watch_checknow"), follow_redirects=True)
assert b'1 watches are rechecking.' in res.data
#response = requests.get('http://localhost:5000/random_string') time.sleep(sleep_time_for_fetch_thread)
#assert response.status_code == 200
#assert response.json() == [{'id': 1}]
# Now something should be ready, indicated by having a 'unviewed' class
res = client.get(url_for("index"))
assert b'unviewed' in res.data
def test_import_a(client):
res = client.get("/")
assert b"IMPORT" in res.data
assert res.status_code == 200

@ -7,6 +7,9 @@ six==1.10.0
yarl yarl
flask flask
pytest
pytest-flask # for live_server
eventlet eventlet
requests requests
validators validators

Loading…
Cancel
Save