Use g and current_app proxies

This should make the code a little cleaner to use these proxy objects.
pull/2790/head
Kenny Root 4 weeks ago
parent 10068e1f24
commit 25408a0aab

@ -19,19 +19,19 @@ from . import __version__
# Parent wrapper or OS sends us a SIGTERM/SIGINT, do everything required for a clean shutdown # Parent wrapper or OS sends us a SIGTERM/SIGINT, do everything required for a clean shutdown
class SigShutdownHandler(object): class SigShutdownHandler(object):
def __init__(self, app): def __init__(self, app: Flask, datastore: store.ChangeDetectionStore):
self.app = app self.app = app
self.datastore = datastore
signal.signal(signal.SIGTERM, lambda _signum, _frame: self._signal_handler("SIGTERM")) signal.signal(signal.SIGTERM, lambda _signum, _frame: self._signal_handler("SIGTERM"))
signal.signal(signal.SIGINT, lambda _signum, _frame: self._signal_handler("SIGINT")) signal.signal(signal.SIGINT, lambda _signum, _frame: self._signal_handler("SIGINT"))
def _signal_handler(self, signame): def _signal_handler(self, signame):
logger.critical(f'Shutdown: Got Signal - {signame}, Saving DB to disk and calling shutdown') logger.critical(f'Shutdown: Got Signal - {signame}, Saving DB to disk and calling shutdown')
datastore = self.app.config["DATASTORE"] self.datastore.sync_to_json()
datastore.sync_to_json()
logger.success('Sync JSON to disk complete.') logger.success('Sync JSON to disk complete.')
# This will throw a SystemExit exception, because eventlet.wsgi.server doesn't know how to deal with it. # This will throw a SystemExit exception, because eventlet.wsgi.server doesn't know how to deal with it.
# Solution: move to gevent or other server in the future (#2014) # Solution: move to gevent or other server in the future (#2014)
datastore.stop_thread = True self.datastore.stop_thread = True
self.app.config.exit.set() self.app.config.exit.set()
sys.exit(0) sys.exit(0)
@ -136,7 +136,7 @@ def create_application() -> Flask:
app = changedetection_app(app_config, datastore) app = changedetection_app(app_config, datastore)
sigshutdown_handler = SigShutdownHandler(app) sigshutdown_handler = SigShutdownHandler(app, datastore)
# Go into cleanup mode # Go into cleanup mode
if do_cleanup: if do_cleanup:

@ -22,6 +22,7 @@ from feedgen.feed import FeedGenerator
from flask import ( from flask import (
Flask, Flask,
abort, abort,
current_app,
flash, flash,
g, g,
make_response, make_response,
@ -133,17 +134,17 @@ def login_optionally_required(func):
has_password_enabled = g.datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False) has_password_enabled = g.datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False)
# Permitted # Permitted
if request.endpoint == 'static_content' and request.view_args['group'] == 'styles': if request.endpoint == 'static_content' and request.view_args and request.view_args['group'] == 'styles':
return func(*args, **kwargs) return func(*args, **kwargs)
# Permitted # Permitted
elif request.endpoint == 'diff_history_page' and g.datastore.data['settings']['application'].get('shared_diff_access'): elif request.endpoint == 'diff_history_page' and g.datastore.data['settings']['application'].get('shared_diff_access'):
return func(*args, **kwargs) return func(*args, **kwargs)
elif request.method in flask_login.config.EXEMPT_METHODS: elif request.method in flask_login.config.EXEMPT_METHODS:
return func(*args, **kwargs) return func(*args, **kwargs)
elif g.app.config.get('LOGIN_DISABLED'): elif current_app.config.get('LOGIN_DISABLED'):
return func(*args, **kwargs) return func(*args, **kwargs)
elif has_password_enabled and not current_user.is_authenticated: elif has_password_enabled and not current_user.is_authenticated:
return g.app.login_manager.unauthorized() return current_app.login_manager.unauthorized()
return func(*args, **kwargs) return func(*args, **kwargs)
@ -165,7 +166,8 @@ def changedetection_app(config, datastore):
# Stop browser caching of assets # Stop browser caching of assets
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
app.config.exit = Event() exit_event = Event()
app.config.exit = exit_event
app.config['NEW_VERSION_AVAILABLE'] = False app.config['NEW_VERSION_AVAILABLE'] = False
@ -182,8 +184,6 @@ def changedetection_app(config, datastore):
app.config["notification_debug_log"] = [] app.config["notification_debug_log"] = []
app.config['DATASTORE'] = datastore
login_manager = flask_login.LoginManager(app) login_manager = flask_login.LoginManager(app)
login_manager.login_view = 'login' login_manager.login_view = 'login'
app.secret_key = init_app_secret(config['datastore_path']) app.secret_key = init_app_secret(config['datastore_path'])
@ -322,7 +322,6 @@ def changedetection_app(config, datastore):
@app.before_request @app.before_request
def remember_app_and_datastore(): def remember_app_and_datastore():
g.app = app
g.datastore = datastore g.datastore = datastore
@app.before_request @app.before_request
@ -1630,25 +1629,23 @@ def changedetection_app(config, datastore):
# @todo handle ctrl break # @todo handle ctrl break
threading.Thread(target=ticker_thread_check_time_launch_checks, args=(app,)).start() threading.Thread(target=ticker_thread_check_time_launch_checks, kwargs={'app': app, 'datastore': datastore, 'exit_event': exit_event}).start()
threading.Thread(target=notification_runner, args=(app,)).start() threading.Thread(target=notification_runner, kwargs={'app': app, 'datastore': datastore, 'exit_event': exit_event}).start()
# Check for new release version, but not when running in test/build or pytest # Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')): if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')):
threading.Thread(target=check_for_new_version, args=(app,)).start() threading.Thread(target=check_for_new_version, kwargs={'app': app, 'datastore': datastore, 'exit_event': exit_event}).start()
return app return app
# Check for new version and anonymous stats # Check for new version and anonymous stats
def check_for_new_version(app, url="https://changedetection.io/check-ver.php", delay_time=86400): def check_for_new_version(*, app, datastore, exit_event, url="https://changedetection.io/check-ver.php", delay_time=86400):
import requests import requests
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
datastore = app.config["DATASTORE"] while not exit_event.is_set():
while not app.config.exit.is_set():
try: try:
r = requests.post(url, r = requests.post(url,
data={'version': __version__, data={'version': __version__,
@ -1667,13 +1664,13 @@ def check_for_new_version(app, url="https://changedetection.io/check-ver.php", d
pass pass
# Check daily # Check daily
app.config.exit.wait(delay_time) exit_event.wait(delay_time)
def notification_runner(app): def notification_runner(*, app, datastore, exit_event):
from datetime import datetime from datetime import datetime
import json import json
while not app.config.exit.is_set(): while not exit_event.is_set():
try: try:
# At the moment only one thread runs (single runner) # At the moment only one thread runs (single runner)
n_object = notification_q.get(block=False) n_object = notification_q.get(block=False)
@ -1687,8 +1684,6 @@ def notification_runner(app):
notification_debug_log = app.config["notification_debug_log"] notification_debug_log = app.config["notification_debug_log"]
datastore = app.config["DATASTORE"]
try: try:
from changedetectionio import notification from changedetectionio import notification
# Fallback to system config if not set # Fallback to system config if not set
@ -1720,12 +1715,10 @@ def notification_runner(app):
notification_debug_log = notification_debug_log[-100:] notification_debug_log = notification_debug_log[-100:]
# Threaded runner, look for new watches to feed into the Queue. # Threaded runner, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks(app): def ticker_thread_check_time_launch_checks(*, app, datastore, exit_event):
import random import random
from changedetectionio import update_worker from changedetectionio import update_worker
datastore = app.config["DATASTORE"]
proxy_last_called_time = {} proxy_last_called_time = {}
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
@ -1739,7 +1732,7 @@ def ticker_thread_check_time_launch_checks(app):
running_update_threads.append(new_worker) running_update_threads.append(new_worker)
new_worker.start() new_worker.start()
while not app.config.exit.is_set(): while not exit_event.is_set():
# Get a list of watches by UUID that are currently fetching data # Get a list of watches by UUID that are currently fetching data
running_uuids = [] running_uuids = []
@ -1835,4 +1828,4 @@ def ticker_thread_check_time_launch_checks(app):
time.sleep(1) time.sleep(1)
# Should be low so we can break this out in testing # Should be low so we can break this out in testing
app.config.exit.wait(1) exit_event.wait(1)

@ -1,5 +1,5 @@
import os import os
from flask import url_for from flask import url_for, g
from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
@ -35,7 +35,7 @@ def test_execute_custom_js(client, live_server, measure_memory_usage):
wait_for_all_checks(client) wait_for_all_checks(client)
uuid = extract_UUID_from_client(client) uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)" assert g.datastore.data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
assert b"This text should be removed" not in res.data assert b"This text should be removed" not in res.data

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import time import time
from flask import url_for from flask import url_for, g
from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
@ -73,5 +73,5 @@ def test_noproxy_option(client, live_server, measure_memory_usage):
# Prove that it actually checked # Prove that it actually checked
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'] != 0 assert g.datastore.data['watching'][uuid]['last_checked'] != 0

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import time import time
from flask import url_for from flask import url_for, g
from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI, wait_for_all_checks from .util import live_server_setup, extract_UUID_from_client, extract_api_key_from_UI, wait_for_all_checks
@ -154,7 +154,7 @@ def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
wait_for_all_checks(client) wait_for_all_checks(client)
for k,v in client.application.config.get('DATASTORE').data['watching'].items(): for k,v in g.datastore.data['watching'].items():
assert v.get('last_error') == False assert v.get('last_error') == False
assert v.get('has_ldjson_price_data') == has_ldjson_price_data, f"Detected LDJSON data? should be {has_ldjson_price_data}" assert v.get('has_ldjson_price_data') == has_ldjson_price_data, f"Detected LDJSON data? should be {has_ldjson_price_data}"

@ -2,7 +2,7 @@
# coding=utf-8 # coding=utf-8
import time import time
from flask import url_for from flask import url_for, g
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
import pytest import pytest
@ -41,7 +41,7 @@ def test_check_encoding_detection(client, live_server, measure_memory_usage):
# Content type recording worked # Content type recording worked
uuid = extract_UUID_from_client(client) uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['content-type'] == "text/html" assert g.datastore.data['watching'][uuid]['content-type'] == "text/html"
res = client.get( res = client.get(
url_for("preview_page", uuid="first"), url_for("preview_page", uuid="first"),

@ -1,7 +1,7 @@
import os import os
import time import time
from loguru import logger from loguru import logger
from flask import url_for from flask import url_for, g
from .util import set_original_response, live_server_setup, extract_UUID_from_client, wait_for_all_checks, \ from .util import set_original_response, live_server_setup, extract_UUID_from_client, wait_for_all_checks, \
wait_for_notification_endpoint_output wait_for_notification_endpoint_output
from changedetectionio.model import App from changedetectionio.model import App
@ -53,7 +53,7 @@ def run_filter_test(client, live_server, content_filter):
uuid = extract_UUID_from_client(client) uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 0, "No filter = No filter failure" assert g.datastore.data['watching'][uuid]['consecutive_filter_failures'] == 0, "No filter = No filter failure"
watch_data = {"notification_urls": notification_url, watch_data = {"notification_urls": notification_url,
"notification_title": "New ChangeDetection.io Notification - {{watch_url}}", "notification_title": "New ChangeDetection.io Notification - {{watch_url}}",
@ -86,7 +86,7 @@ def run_filter_test(client, live_server, content_filter):
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
wait_for_all_checks(client) wait_for_all_checks(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 0, "No filter = No filter failure" assert g.datastore.data['watching'][uuid]['consecutive_filter_failures'] == 0, "No filter = No filter failure"
# Now add a filter, because recheck hours == 5, ONLY pressing of the [edit] or [recheck all] should trigger # Now add a filter, because recheck hours == 5, ONLY pressing of the [edit] or [recheck all] should trigger
watch_data['include_filters'] = content_filter watch_data['include_filters'] = content_filter
@ -103,12 +103,12 @@ def run_filter_test(client, live_server, content_filter):
assert not os.path.isfile("test-datastore/notification.txt") assert not os.path.isfile("test-datastore/notification.txt")
# Hitting [save] would have triggered a recheck, and we have a filter, so this would be ONE failure # Hitting [save] would have triggered a recheck, and we have a filter, so this would be ONE failure
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 1, "Should have been checked once" assert g.datastore.data['watching'][uuid]['consecutive_filter_failures'] == 1, "Should have been checked once"
# recheck it up to just before the threshold, including the fact that in the previous POST it would have rechecked (and incremented) # recheck it up to just before the threshold, including the fact that in the previous POST it would have rechecked (and incremented)
# Add 4 more checks # Add 4 more checks
checked = 0 checked = 0
ATTEMPT_THRESHOLD_SETTING = live_server.app.config['DATASTORE'].data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) ATTEMPT_THRESHOLD_SETTING = g.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
for i in range(0, ATTEMPT_THRESHOLD_SETTING - 2): for i in range(0, ATTEMPT_THRESHOLD_SETTING - 2):
checked += 1 checked += 1
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
@ -118,7 +118,7 @@ def run_filter_test(client, live_server, content_filter):
assert not os.path.isfile("test-datastore/notification.txt") assert not os.path.isfile("test-datastore/notification.txt")
time.sleep(1) time.sleep(1)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 5 assert g.datastore.data['watching'][uuid]['consecutive_filter_failures'] == 5
time.sleep(2) time.sleep(2)
# One more check should trigger the _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT threshold # One more check should trigger the _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT threshold

@ -4,7 +4,7 @@ import time
import os import os
import json import json
import logging import logging
from flask import url_for from flask import url_for, g
from .util import live_server_setup, wait_for_all_checks from .util import live_server_setup, wait_for_all_checks
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
@ -38,7 +38,7 @@ def test_consistent_history(client, live_server, measure_memory_usage):
time.sleep(2) time.sleep(2)
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json') json_db_file = os.path.join(g.datastore.datastore_path, 'url-watches.json')
json_obj = None json_obj = None
with open(json_db_file, 'r') as f: with open(json_db_file, 'r') as f:
@ -49,7 +49,7 @@ def test_consistent_history(client, live_server, measure_memory_usage):
# each one should have a history.txt containing just one line # each one should have a history.txt containing just one line
for w in json_obj['watching'].keys(): for w in json_obj['watching'].keys():
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt') history_txt_index_file = os.path.join(g.datastore.datastore_path, w, 'history.txt')
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}" assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
# Same like in model.Watch # Same like in model.Watch
@ -58,13 +58,13 @@ def test_consistent_history(client, live_server, measure_memory_usage):
assert len(tmp_history) == 1, "History.txt should contain 1 line" assert len(tmp_history) == 1, "History.txt should contain 1 line"
# Should be two files,. the history.txt , and the snapshot.txt # Should be two files,. the history.txt , and the snapshot.txt
files_in_watch_dir = os.listdir(os.path.join(live_server.app.config['DATASTORE'].datastore_path, files_in_watch_dir = os.listdir(os.path.join(g.datastore.datastore_path,
w)) w))
# Find the snapshot one # Find the snapshot one
for fname in files_in_watch_dir: for fname in files_in_watch_dir:
if fname != 'history.txt' and 'html' not in fname: if fname != 'history.txt' and 'html' not in fname:
# contents should match what we requested as content returned from the test url # contents should match what we requested as content returned from the test url
with open(os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname), 'r') as snapshot_f: with open(os.path.join(g.datastore.datastore_path, w, fname), 'r') as snapshot_f:
contents = snapshot_f.read() contents = snapshot_f.read()
watch_url = json_obj['watching'][w]['url'] watch_url = json_obj['watching'][w]['url']
u = urlparse(watch_url) u = urlparse(watch_url)
@ -76,6 +76,6 @@ def test_consistent_history(client, live_server, measure_memory_usage):
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot" assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json') json_db_file = os.path.join(g.datastore.datastore_path, 'url-watches.json')
with open(json_db_file, 'r') as f: with open(json_db_file, 'r') as f:
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved" assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"

@ -3,7 +3,7 @@ import io
import os import os
import time import time
from flask import url_for from flask import url_for, g
from .util import live_server_setup, wait_for_all_checks from .util import live_server_setup, wait_for_all_checks
@ -163,7 +163,7 @@ def test_import_custom_xlsx(client, live_server, measure_memory_usage):
assert b'City news results' in res.data assert b'City news results' in res.data
# Just find one to check over # Just find one to check over
for uuid, watch in live_server.app.config['DATASTORE'].data['watching'].items(): for uuid, watch in g.datastore.data['watching'].items():
if watch.get('title') == 'Somesite results ABC': if watch.get('title') == 'Somesite results ABC':
filters = watch.get('include_filters') filters = watch.get('include_filters')
assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]' assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]'
@ -201,7 +201,7 @@ def test_import_watchete_xlsx(client, live_server, measure_memory_usage):
assert b'City news results' in res.data assert b'City news results' in res.data
# Just find one to check over # Just find one to check over
for uuid, watch in live_server.app.config['DATASTORE'].data['watching'].items(): for uuid, watch in g.datastore.data['watching'].items():
if watch.get('title') == 'Somesite results ABC': if watch.get('title') == 'Somesite results ABC':
filters = watch.get('include_filters') filters = watch.get('include_filters')
assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]' assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]'

@ -1,7 +1,7 @@
import json import json
import os import os
import time import time
from flask import url_for from flask import url_for, g
from . util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_UUID_from_client from . util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_UUID_from_client
def test_setup(live_server): def test_setup(live_server):
@ -73,13 +73,13 @@ def test_headers_in_request(client, live_server, measure_memory_usage):
# Re #137 - It should have only one set of headers entered # Re #137 - It should have only one set of headers entered
watches_with_headers = 0 watches_with_headers = 0
for k, watch in client.application.config.get('DATASTORE').data.get('watching').items(): for k, watch in g.datastore.data.get('watching').items():
if (len(watch['headers'])): if (len(watch['headers'])):
watches_with_headers += 1 watches_with_headers += 1
assert watches_with_headers == 1 assert watches_with_headers == 1
# 'server' http header was automatically recorded # 'server' http header was automatically recorded
for k, watch in client.application.config.get('DATASTORE').data.get('watching').items(): for k, watch in g.datastore.data.get('watching').items():
assert 'custom' in watch.get('remote_server_reply') # added in util.py assert 'custom' in watch.get('remote_server_reply') # added in util.py
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True) res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from flask import make_response, request from flask import make_response, request
from flask import url_for from flask import url_for, g
import logging import logging
import time import time
@ -103,7 +103,7 @@ def extract_api_key_from_UI(client):
# kinda funky, but works for now # kinda funky, but works for now
def get_UUID_for_tag_name(client, name): def get_UUID_for_tag_name(client, name):
app_config = client.application.config.get('DATASTORE').data app_config = g.datastore.data
for uuid, tag in app_config['settings']['application'].get('tags', {}).items(): for uuid, tag in app_config['settings']['application'].get('tags', {}).items():
if name == tag.get('title', '').lower().strip(): if name == tag.get('title', '').lower().strip():
return uuid return uuid

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import os import os
from flask import url_for from flask import url_for, g
from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
def test_setup(client, live_server, measure_memory_usage): def test_setup(client, live_server, measure_memory_usage):
@ -43,7 +43,7 @@ def test_visual_selector_content_ready(client, live_server, measure_memory_usage
wait_for_all_checks(client) wait_for_all_checks(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)" assert g.datastore.data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
res = client.get( res = client.get(
url_for("preview_page", uuid=uuid), url_for("preview_page", uuid=uuid),
@ -120,7 +120,7 @@ def test_basic_browserstep(client, live_server, measure_memory_usage):
wait_for_all_checks(client) wait_for_all_checks(client)
uuid = extract_UUID_from_client(client) uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)" assert g.datastore.data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
assert b"This text should be removed" not in res.data assert b"This text should be removed" not in res.data

Loading…
Cancel
Save