You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
112 lines
3.5 KiB
112 lines
3.5 KiB
#!/usr/bin/env python3
|
|
import resource
|
|
import time
|
|
from threading import Thread
|
|
|
|
import pytest
|
|
from changedetectionio import changedetection_app
|
|
from changedetectionio import store
|
|
import os
|
|
import sys
|
|
from loguru import logger
|
|
|
|
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py
|
|
# Much better boilerplate than the docs
|
|
# https://www.python-boilerplate.com/py3+flask+pytest/
|
|
|
|
global app
|
|
|
|
# https://loguru.readthedocs.io/en/latest/resources/migration.html#replacing-caplog-fixture-from-pytest-library
|
|
# Show loguru logs only if CICD pytest fails.
|
|
from loguru import logger
|
|
@pytest.fixture
|
|
def reportlog(pytestconfig):
|
|
logging_plugin = pytestconfig.pluginmanager.getplugin("logging-plugin")
|
|
handler_id = logger.add(logging_plugin.report_handler, format="{message}")
|
|
yield
|
|
logger.remove(handler_id)
|
|
|
|
|
|
def track_memory(memory_usage, ):
|
|
while not memory_usage["stop"]:
|
|
max_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
|
|
memory_usage["peak"] = max(memory_usage["peak"], max_rss)
|
|
time.sleep(0.01) # Adjust the sleep time as needed
|
|
|
|
@pytest.fixture(scope='function')
|
|
def measure_memory_usage(request):
|
|
memory_usage = {"peak": 0, "stop": False}
|
|
tracker_thread = Thread(target=track_memory, args=(memory_usage,))
|
|
tracker_thread.start()
|
|
|
|
yield
|
|
|
|
memory_usage["stop"] = True
|
|
tracker_thread.join()
|
|
|
|
# Note: ru_maxrss is in kilobytes on Unix-based systems
|
|
max_memory_used = memory_usage["peak"] / 1024 # Convert to MB
|
|
s = f"Peak memory used by the test {request.node.fspath} - '{request.node.name}': {max_memory_used:.2f} MB"
|
|
logger.debug(s)
|
|
|
|
# Assert that the memory usage is less than 200MB
|
|
# assert max_memory_used < 150, f"Memory usage exceeded 200MB: {max_memory_used:.2f} MB"
|
|
|
|
|
|
def cleanup(datastore_path):
|
|
import glob
|
|
# Unlink test output files
|
|
|
|
for g in ["*.txt", "*.json", "*.pdf"]:
|
|
files = glob.glob(os.path.join(datastore_path, g))
|
|
for f in files:
|
|
if 'proxies.json' in f:
|
|
# Usually mounted by docker container during test time
|
|
continue
|
|
if os.path.isfile(f):
|
|
os.unlink(f)
|
|
|
|
@pytest.fixture(scope='session')
|
|
def app(request):
|
|
"""Create application for the tests."""
|
|
datastore_path = "./test-datastore"
|
|
|
|
# So they don't delay in fetching
|
|
os.environ["MINIMUM_SECONDS_RECHECK_TIME"] = "0"
|
|
try:
|
|
os.mkdir(datastore_path)
|
|
except FileExistsError:
|
|
pass
|
|
|
|
cleanup(datastore_path)
|
|
|
|
app_config = {'datastore_path': datastore_path, 'disable_checkver' : True}
|
|
cleanup(app_config['datastore_path'])
|
|
|
|
logger_level = 'TRACE'
|
|
|
|
logger.remove()
|
|
log_level_for_stdout = { 'DEBUG', 'SUCCESS' }
|
|
logger.configure(handlers=[
|
|
{"sink": sys.stdout, "level": logger_level,
|
|
"filter" : lambda record: record['level'].name in log_level_for_stdout},
|
|
{"sink": sys.stderr, "level": logger_level,
|
|
"filter": lambda record: record['level'].name not in log_level_for_stdout},
|
|
])
|
|
|
|
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], include_default_watches=False)
|
|
app = changedetection_app(app_config, datastore)
|
|
|
|
# Disable CSRF while running tests
|
|
app.config['WTF_CSRF_ENABLED'] = False
|
|
app.config['STOP_THREADS'] = True
|
|
|
|
def teardown():
|
|
datastore.stop_thread = True
|
|
app.config.exit.set()
|
|
cleanup(app_config['datastore_path'])
|
|
|
|
|
|
request.addfinalizer(teardown)
|
|
yield app
|