Eliminate globals usage

This will help running tests because the app isn't initialized
automatically by touching the "changedetectionio" package. Moving things
out of the __init__.py removes the side-effect of "import
changedetection" which means tests can control the state without
restarting.

This is the first step in making the tests run with only calling
"pytest". The fixture use and test setup need to be adjusted to not
depend on test ordering.
pull/2790/head
Kenny Root 1 month ago
parent feccb18cdc
commit 10068e1f24

@ -13,6 +13,7 @@ ARG CRYPTOGRAPHY_DONT_BUILD_RUST=1
RUN apt-get update && apt-get install -y --no-install-recommends \ RUN apt-get update && apt-get install -y --no-install-recommends \
g++ \ g++ \
gcc \ gcc \
git \
libc-dev \ libc-dev \
libffi-dev \ libffi-dev \
libjpeg-dev \ libjpeg-dev \

@ -2,5 +2,5 @@
# Only exists for direct CLI usage # Only exists for direct CLI usage
import changedetectionio from changedetectionio.__main__ import main
changedetectionio.main() main()

@ -3,193 +3,3 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki # Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.47.06' __version__ = '0.47.06'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
import os
os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
import eventlet
import eventlet.wsgi
import getopt
import signal
import socket
import sys
from changedetectionio import store
from changedetectionio.flask_app import changedetection_app
from loguru import logger
# Only global so we can access it in the signal handler
app = None
datastore = None
# Parent wrapper or OS sends us a SIGTERM/SIGINT, do everything required for a clean shutdown
def sigshutdown_handler(_signo, _stack_frame):
global app
global datastore
name = signal.Signals(_signo).name
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Saving DB to disk and calling shutdown')
datastore.sync_to_json()
logger.success('Sync JSON to disk complete.')
# This will throw a SystemExit exception, because eventlet.wsgi.server doesn't know how to deal with it.
# Solution: move to gevent or other server in the future (#2014)
datastore.stop_thread = True
app.config.exit.set()
sys.exit()
def main():
global datastore
global app
datastore_path = None
do_cleanup = False
host = ''
ipv6_enabled = False
port = os.environ.get('PORT') or 5000
ssl_mode = False
# On Windows, create and use a default path.
if os.name == 'nt':
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
os.makedirs(datastore_path, exist_ok=True)
else:
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
datastore_path = os.path.join(os.getcwd(), "../datastore")
try:
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
sys.exit(2)
create_datastore_dir = False
# Set a default logger level
logger_level = 'DEBUG'
# Set a logger level via shell env variable
# Used: Dockerfile for CICD
# To set logger level for pytest, see the app function in tests/conftest.py
if os.getenv("LOGGER_LEVEL"):
level = os.getenv("LOGGER_LEVEL")
logger_level = int(level) if level.isdigit() else level.upper()
for opt, arg in opts:
if opt == '-s':
ssl_mode = True
if opt == '-h':
host = arg
if opt == '-p':
port = int(arg)
if opt == '-d':
datastore_path = arg
if opt == '-6':
logger.success("Enabling IPv6 listen support")
ipv6_enabled = True
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
# Create the datadir if it doesnt exist
if opt == '-C':
create_datastore_dir = True
if opt == '-l':
logger_level = int(arg) if arg.isdigit() else arg.upper()
# Without this, a logger will be duplicated
logger.remove()
try:
log_level_for_stdout = { 'DEBUG', 'SUCCESS' }
logger.configure(handlers=[
{"sink": sys.stdout, "level": logger_level,
"filter" : lambda record: record['level'].name in log_level_for_stdout},
{"sink": sys.stderr, "level": logger_level,
"filter": lambda record: record['level'].name not in log_level_for_stdout},
])
# Catch negative number or wrong log level name
except ValueError:
print("Available log level names: TRACE, DEBUG(default), INFO, SUCCESS,"
" WARNING, ERROR, CRITICAL")
sys.exit(2)
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
app_config = {'datastore_path': datastore_path}
if not os.path.isdir(app_config['datastore_path']):
if create_datastore_dir:
os.mkdir(app_config['datastore_path'])
else:
logger.critical(
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
f" does not exist, cannot start, please make sure the"
f" directory exists or specify a directory with the -d option.\n"
f"Or use the -C parameter to create the directory.")
sys.exit(2)
try:
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__)
except JSONDecodeError as e:
# Dont' start if the JSON DB looks corrupt
logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.")
logger.critical(str(e))
return
app = changedetection_app(app_config, datastore)
signal.signal(signal.SIGTERM, sigshutdown_handler)
signal.signal(signal.SIGINT, sigshutdown_handler)
# Go into cleanup mode
if do_cleanup:
datastore.remove_unused_snapshots()
app.config['datastore_path'] = datastore_path
@app.context_processor
def inject_version():
return dict(right_sticky="v{}".format(datastore.data['version_tag']),
new_version_available=app.config['NEW_VERSION_AVAILABLE'],
has_password=datastore.data['settings']['application']['password'] != False
)
# Monitored websites will not receive a Referer header when a user clicks on an outgoing link.
# @Note: Incompatible with password login (and maybe other features) for now, submit a PR!
@app.after_request
def hide_referrer(response):
if strtobool(os.getenv("HIDE_REFERER", 'false')):
response.headers["Referrer-Policy"] = "no-referrer"
return response
# Proxy sub-directory support
# Set environment var USE_X_SETTINGS=1 on this script
# And then in your proxy_pass settings
#
# proxy_set_header Host "localhost";
# proxy_set_header X-Forwarded-Prefix /app;
if os.getenv('USE_X_SETTINGS'):
logger.info("USE_X_SETTINGS is ENABLED")
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
s_type = socket.AF_INET6 if ipv6_enabled else socket.AF_INET
if ssl_mode:
# @todo finalise SSL config, but this should get you in the right direction if you need it.
eventlet.wsgi.server(eventlet.wrap_ssl(eventlet.listen((host, port), s_type),
certfile='cert.pem',
keyfile='privkey.pem',
server_side=True), app)
else:
eventlet.wsgi.server(eventlet.listen((host, int(port)), s_type), app)

@ -0,0 +1,204 @@
from flask import Flask
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
import os
os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
import eventlet
import eventlet.wsgi
import getopt
import signal
import socket
import sys
from changedetectionio import store
from changedetectionio.flask_app import changedetection_app
from loguru import logger
from . import __version__
# Parent wrapper or OS sends us a SIGTERM/SIGINT, do everything required for a clean shutdown
class SigShutdownHandler(object):
def __init__(self, app):
self.app = app
signal.signal(signal.SIGTERM, lambda _signum, _frame: self._signal_handler("SIGTERM"))
signal.signal(signal.SIGINT, lambda _signum, _frame: self._signal_handler("SIGINT"))
def _signal_handler(self, signame):
logger.critical(f'Shutdown: Got Signal - {signame}, Saving DB to disk and calling shutdown')
datastore = self.app.config["DATASTORE"]
datastore.sync_to_json()
logger.success('Sync JSON to disk complete.')
# This will throw a SystemExit exception, because eventlet.wsgi.server doesn't know how to deal with it.
# Solution: move to gevent or other server in the future (#2014)
datastore.stop_thread = True
self.app.config.exit.set()
sys.exit(0)
def create_application() -> Flask:
datastore_path = None
do_cleanup = False
host = ''
ipv6_enabled = False
port = os.environ.get('PORT') or 5000
ssl_mode = False
# On Windows, create and use a default path.
if os.name == 'nt':
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
os.makedirs(datastore_path, exist_ok=True)
else:
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
datastore_path = os.path.join(os.getcwd(), "../datastore")
try:
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
sys.exit(2)
create_datastore_dir = False
# Set a default logger level
logger_level = 'DEBUG'
# Set a logger level via shell env variable
# Used: Dockerfile for CICD
# To set logger level for pytest, see the app function in tests/conftest.py
if os.getenv("LOGGER_LEVEL"):
level = os.getenv("LOGGER_LEVEL")
logger_level = int(level) if level.isdigit() else level.upper()
for opt, arg in opts:
if opt == '-s':
ssl_mode = True
if opt == '-h':
host = arg
if opt == '-p':
port = int(arg)
if opt == '-d':
datastore_path = arg
if opt == '-6':
logger.success("Enabling IPv6 listen support")
ipv6_enabled = True
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
# Create the datadir if it doesnt exist
if opt == '-C':
create_datastore_dir = True
if opt == '-l':
logger_level = int(arg) if arg.isdigit() else arg.upper()
# Without this, a logger will be duplicated
logger.remove()
try:
log_level_for_stdout = { 'DEBUG', 'SUCCESS' }
logger.configure(handlers=[
{"sink": sys.stdout, "level": logger_level,
"filter" : lambda record: record['level'].name in log_level_for_stdout},
{"sink": sys.stderr, "level": logger_level,
"filter": lambda record: record['level'].name not in log_level_for_stdout},
])
# Catch negative number or wrong log level name
except ValueError:
print("Available log level names: TRACE, DEBUG(default), INFO, SUCCESS,"
" WARNING, ERROR, CRITICAL")
sys.exit(2)
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
app_config = {'datastore_path': datastore_path}
if not os.path.isdir(app_config['datastore_path']):
if create_datastore_dir:
os.mkdir(app_config['datastore_path'])
else:
logger.critical(
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
f" does not exist, cannot start, please make sure the"
f" directory exists or specify a directory with the -d option.\n"
f"Or use the -C parameter to create the directory.")
sys.exit(2)
try:
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__)
except JSONDecodeError as e:
# Dont' start if the JSON DB looks corrupt
logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.")
logger.critical(str(e))
raise(e)
app = changedetection_app(app_config, datastore)
sigshutdown_handler = SigShutdownHandler(app)
# Go into cleanup mode
if do_cleanup:
datastore.remove_unused_snapshots()
app.config['datastore_path'] = datastore_path
@app.context_processor
def inject_version():
return dict(right_sticky="v{}".format(datastore.data['version_tag']),
new_version_available=app.config['NEW_VERSION_AVAILABLE'],
has_password=datastore.data['settings']['application']['password'] is not False
)
# Monitored websites will not receive a Referer header when a user clicks on an outgoing link.
# @Note: Incompatible with password login (and maybe other features) for now, submit a PR!
@app.after_request
def hide_referrer(response):
if strtobool(os.getenv("HIDE_REFERER", 'false')):
response.headers["Referrer-Policy"] = "no-referrer"
return response
# Proxy sub-directory support
# Set environment var USE_X_SETTINGS=1 on this script
# And then in your proxy_pass settings
#
# proxy_set_header Host "localhost";
# proxy_set_header X-Forwarded-Prefix /app;
if os.getenv('USE_X_SETTINGS'):
logger.info("USE_X_SETTINGS is ENABLED")
from werkzeug.middleware.proxy_fix import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
app.config["USE_IPV6"] = ipv6_enabled
app.config["USE_SSL"] = ssl_mode
app.config["HOST"] = host
app.config["PORT"] = port
return app
app = create_application()
def main():
from .__main__ import app
s_type = socket.AF_INET6 if app.config["USE_IPV6"] else socket.AF_INET
host = app.config["HOST"]
port = app.config["PORT"]
if app.config["USE_SSL"]:
# @todo finalise SSL config, but this should get you in the right direction if you need it.
eventlet.wsgi.server(eventlet.wrap_ssl(eventlet.listen((host, port), s_type),
certfile='cert.pem',
keyfile='privkey.pem',
server_side=True), app)
else:
eventlet.wsgi.server(eventlet.listen((host, int(port)), s_type), app)
if __name__ == "__main__":
main()

@ -1,7 +1,7 @@
from loguru import logger from loguru import logger
import hashlib import hashlib
import os import os
from changedetectionio import strtobool from changedetectionio.strtobool import strtobool
from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived
from changedetectionio.content_fetchers.base import Fetcher from changedetectionio.content_fetchers.base import Fetcher

@ -23,6 +23,7 @@ from flask import (
Flask, Flask,
abort, abort,
flash, flash,
g,
make_response, make_response,
redirect, redirect,
render_template, render_template,
@ -45,8 +46,6 @@ from changedetectionio import html_tools, __version__
from changedetectionio import queuedWatchMetaData from changedetectionio import queuedWatchMetaData
from changedetectionio.api import api_v1 from changedetectionio.api import api_v1
datastore = None
# Local # Local
running_update_threads = [] running_update_threads = []
ticker_thread = None ticker_thread = None
@ -57,44 +56,14 @@ update_q = queue.PriorityQueue()
notification_q = queue.Queue() notification_q = queue.Queue()
MAX_QUEUE_SIZE = 2000 MAX_QUEUE_SIZE = 2000
app = Flask(__name__, def setup_locale():
static_url_path="", # get locale ready
static_folder="static", default_locale = locale.getdefaultlocale()
template_folder="templates") logger.info(f"System locale default is {default_locale}")
try:
# Enable CORS, especially useful for the Chrome extension to operate from anywhere locale.setlocale(locale.LC_ALL, default_locale)
CORS(app) except locale.Error:
logger.warning(f"Unable to set locale {default_locale}, locale is not installed maybe?")
# Super handy for compressing large BrowserSteps responses and others
FlaskCompress(app)
# Stop browser caching of assets
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
app.config.exit = Event()
app.config['NEW_VERSION_AVAILABLE'] = False
if os.getenv('FLASK_SERVER_NAME'):
app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME')
#app.config["EXPLAIN_TEMPLATE_LOADING"] = True
# Disables caching of the templates
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.jinja_env.add_extension('jinja2.ext.loopcontrols')
csrf = CSRFProtect()
csrf.init_app(app)
notification_debug_log=[]
# Locale for correct presentation of prices etc
default_locale = locale.getdefaultlocale()
logger.info(f"System locale default is {default_locale}")
try:
locale.setlocale(locale.LC_ALL, default_locale)
except locale.Error:
logger.warning(f"Unable to set locale {default_locale}, locale is not installed maybe?")
watch_api = Api(app, decorators=[csrf.exempt])
def init_app_secret(datastore_path): def init_app_secret(datastore_path):
secret = "" secret = ""
@ -113,61 +82,6 @@ def init_app_secret(datastore_path):
return secret return secret
@app.template_global()
def get_darkmode_state():
css_dark_mode = request.cookies.get('css_dark_mode', 'false')
return 'true' if css_dark_mode and strtobool(css_dark_mode) else 'false'
@app.template_global()
def get_css_version():
return __version__
@app.template_filter('format_number_locale')
def _jinja2_filter_format_number_locale(value: float) -> str:
"Formats for example 4000.10 to the local locale default of 4,000.10"
# Format the number with two decimal places (locale format string will return 6 decimal)
formatted_value = locale.format_string("%.2f", value, grouping=True)
return formatted_value
# We use the whole watch object from the store/JSON so we can see if there's some related status in terms of a thread
# running or something similar.
@app.template_filter('format_last_checked_time')
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
# Worker thread tells us which UUID it is currently processing.
for t in running_update_threads:
if t.current_uuid == watch_obj['uuid']:
return '<span class="spinner"></span><span> Checking now</span>'
if watch_obj['last_checked'] == 0:
return 'Not yet'
return timeago.format(int(watch_obj['last_checked']), time.time())
@app.template_filter('format_timestamp_timeago')
def _jinja2_filter_datetimestamp(timestamp, format="%Y-%m-%d %H:%M:%S"):
if not timestamp:
return 'Not yet'
return timeago.format(int(timestamp), time.time())
@app.template_filter('pagination_slice')
def _jinja2_filter_pagination_slice(arr, skip):
per_page = datastore.data['settings']['application'].get('pager_size', 50)
if per_page:
return arr[skip:skip + per_page]
return arr
@app.template_filter('format_seconds_ago')
def _jinja2_filter_seconds_precise(timestamp):
if timestamp == False:
return 'Not yet'
return format(int(time.time()-timestamp), ',d')
# When nobody is logged in Flask-Login's current_user is set to an AnonymousUser object. # When nobody is logged in Flask-Login's current_user is set to an AnonymousUser object.
class User(flask_login.UserMixin): class User(flask_login.UserMixin):
id=None id=None
@ -186,7 +100,7 @@ class User(flask_login.UserMixin):
return str(self.id) return str(self.id)
# Compare given password against JSON store or Env var # Compare given password against JSON store or Env var
def check_password(self, password): def check_password(self, password, *, datastore):
import base64 import base64
import hashlib import hashlib
@ -216,39 +130,65 @@ def login_optionally_required(func):
@wraps(func) @wraps(func)
def decorated_view(*args, **kwargs): def decorated_view(*args, **kwargs):
has_password_enabled = datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False) has_password_enabled = g.datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False)
# Permitted # Permitted
if request.endpoint == 'static_content' and request.view_args['group'] == 'styles': if request.endpoint == 'static_content' and request.view_args['group'] == 'styles':
return func(*args, **kwargs) return func(*args, **kwargs)
# Permitted # Permitted
elif request.endpoint == 'diff_history_page' and datastore.data['settings']['application'].get('shared_diff_access'): elif request.endpoint == 'diff_history_page' and g.datastore.data['settings']['application'].get('shared_diff_access'):
return func(*args, **kwargs) return func(*args, **kwargs)
elif request.method in flask_login.config.EXEMPT_METHODS: elif request.method in flask_login.config.EXEMPT_METHODS:
return func(*args, **kwargs) return func(*args, **kwargs)
elif app.config.get('LOGIN_DISABLED'): elif g.app.config.get('LOGIN_DISABLED'):
return func(*args, **kwargs) return func(*args, **kwargs)
elif has_password_enabled and not current_user.is_authenticated: elif has_password_enabled and not current_user.is_authenticated:
return app.login_manager.unauthorized() return g.app.login_manager.unauthorized()
return func(*args, **kwargs) return func(*args, **kwargs)
return decorated_view return decorated_view
def changedetection_app(config=None, datastore_o=None): def changedetection_app(config, datastore):
logger.trace("TRACE log is enabled") logger.trace("TRACE log is enabled")
global datastore app = Flask(__name__,
datastore = datastore_o static_url_path="",
static_folder="static",
template_folder="templates")
# so far just for read-only via tests, but this will be moved eventually to be the main source # Enable CORS, especially useful for the Chrome extension to operate from anywhere
# (instead of the global var) CORS(app)
app.config['DATASTORE'] = datastore_o
# Super handy for compressing large BrowserSteps responses and others
FlaskCompress(app)
# Stop browser caching of assets
app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0
app.config.exit = Event()
app.config['NEW_VERSION_AVAILABLE'] = False
if os.getenv('FLASK_SERVER_NAME'):
app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME')
#app.config["EXPLAIN_TEMPLATE_LOADING"] = True
# Disables caching of the templates
app.config['TEMPLATES_AUTO_RELOAD'] = True
app.jinja_env.add_extension('jinja2.ext.loopcontrols')
csrf = CSRFProtect()
csrf.init_app(app)
app.config["notification_debug_log"] = []
app.config['DATASTORE'] = datastore
login_manager = flask_login.LoginManager(app) login_manager = flask_login.LoginManager(app)
login_manager.login_view = 'login' login_manager.login_view = 'login'
app.secret_key = init_app_secret(config['datastore_path']) app.secret_key = init_app_secret(config['datastore_path'])
watch_api = Api(app, decorators=[csrf.exempt])
watch_api.add_resource(api_v1.WatchSingleHistory, watch_api.add_resource(api_v1.WatchSingleHistory,
'/api/v1/watch/<string:uuid>/history/<string:timestamp>', '/api/v1/watch/<string:uuid>/history/<string:timestamp>',
@ -271,12 +211,61 @@ def changedetection_app(config=None, datastore_o=None):
'/api/v1/import', '/api/v1/import',
resource_class_kwargs={'datastore': datastore}) resource_class_kwargs={'datastore': datastore})
# Setup cors headers to allow all domains # Flask Templates
# https://flask-cors.readthedocs.io/en/latest/ @app.template_global()
# CORS(app) def get_darkmode_state():
css_dark_mode = request.cookies.get('css_dark_mode', 'false')
return 'true' if css_dark_mode and strtobool(css_dark_mode) else 'false'
@app.template_global()
def get_css_version():
return __version__
@app.template_filter('format_number_locale')
def _jinja2_filter_format_number_locale(value: float) -> str:
"Formats for example 4000.10 to the local locale default of 4,000.10"
# Format the number with two decimal places (locale format string will return 6 decimal)
formatted_value = locale.format_string("%.2f", value, grouping=True)
return formatted_value
# We use the whole watch object from the store/JSON so we can see if there's some related status in terms of a thread
# running or something similar.
@app.template_filter('format_last_checked_time')
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
# Worker thread tells us which UUID it is currently processing.
for t in running_update_threads:
if t.current_uuid == watch_obj['uuid']:
return '<span class="spinner"></span><span> Checking now</span>'
if watch_obj['last_checked'] == 0:
return 'Not yet'
return timeago.format(int(watch_obj['last_checked']), time.time())
@app.template_filter('format_timestamp_timeago')
def _jinja2_filter_datetimestamp(timestamp, format="%Y-%m-%d %H:%M:%S"):
if not timestamp:
return 'Not yet'
return timeago.format(int(timestamp), time.time())
@app.template_filter('pagination_slice')
def _jinja2_filter_pagination_slice(arr, skip):
per_page = datastore.data['settings']['application'].get('pager_size', 50)
if per_page:
return arr[skip:skip + per_page]
return arr
@app.template_filter('format_seconds_ago')
def _jinja2_filter_seconds_precise(timestamp):
if timestamp == False:
return 'Not yet'
return format(int(time.time()-timestamp), ',d')
# Login Manager
@login_manager.user_loader @login_manager.user_loader
def user_loader(email): def user_loader(email):
user = User() user = User()
@ -311,7 +300,7 @@ def changedetection_app(config=None, datastore_o=None):
password = request.form.get('password') password = request.form.get('password')
if (user.check_password(password)): if (user.check_password(password, datastore=datastore)):
flask_login.login_user(user, remember=True) flask_login.login_user(user, remember=True)
# For now there's nothing else interesting here other than the index/list page # For now there's nothing else interesting here other than the index/list page
@ -331,6 +320,11 @@ def changedetection_app(config=None, datastore_o=None):
return redirect(url_for('login')) return redirect(url_for('login'))
@app.before_request
def remember_app_and_datastore():
g.app = app
g.datastore = datastore
@app.before_request @app.before_request
def before_request_handle_cookie_x_settings(): def before_request_handle_cookie_x_settings():
# Set the auth cookie path if we're running as X-settings/X-Forwarded-Prefix # Set the auth cookie path if we're running as X-settings/X-Forwarded-Prefix
@ -344,7 +338,7 @@ def changedetection_app(config=None, datastore_o=None):
def rss(): def rss():
now = time.time() now = time.time()
# Always requires token set # Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token') app_rss_token = g.datastore.data['settings']['application'].get('rss_access_token')
rss_url_token = request.args.get('token') rss_url_token = request.args.get('token')
if rss_url_token != app_rss_token: if rss_url_token != app_rss_token:
return "Access denied, bad token", 403 return "Access denied, bad token", 403
@ -429,7 +423,6 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/", methods=['GET']) @app.route("/", methods=['GET'])
@login_optionally_required @login_optionally_required
def index(): def index():
global datastore
from changedetectionio import forms from changedetectionio import forms
active_tag_req = request.args.get('tag', '').lower().strip() active_tag_req = request.args.get('tag', '').lower().strip()
@ -799,7 +792,7 @@ def changedetection_app(config=None, datastore_o=None):
# Recast it if need be to right data Watch handler # Recast it if need be to right data Watch handler
watch_class = get_custom_watch_obj_for_processor(form.data.get('processor')) watch_class = get_custom_watch_obj_for_processor(form.data.get('processor'))
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore_o.datastore_path, default=datastore.data['watching'][uuid]) datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
flash("Updated watch - unpaused!" if request.args.get('unpause_on_save') else "Updated watch.") flash("Updated watch - unpaused!" if request.args.get('unpause_on_save') else "Updated watch.")
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds # Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
@ -1083,7 +1076,7 @@ def changedetection_app(config=None, datastore_o=None):
extract_regex = request.form.get('extract_regex').strip() extract_regex = request.form.get('extract_regex').strip()
output = watch.extract_regex_from_all_history(extract_regex) output = watch.extract_regex_from_all_history(extract_regex)
if output: if output:
watch_dir = os.path.join(datastore_o.datastore_path, uuid) watch_dir = os.path.join(datastore.datastore_path, uuid)
response = make_response(send_from_directory(directory=watch_dir, path=output, as_attachment=True)) response = make_response(send_from_directory(directory=watch_dir, path=output, as_attachment=True))
response.headers['Content-type'] = 'text/csv' response.headers['Content-type'] = 'text/csv'
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
@ -1238,7 +1231,8 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/settings/notification-logs", methods=['GET']) @app.route("/settings/notification-logs", methods=['GET'])
@login_optionally_required @login_optionally_required
def notification_logs(): def notification_logs():
global notification_debug_log notification_debug_log = app.config["notification_debug_log"]
output = render_template("notification-log.html", output = render_template("notification-log.html",
logs=notification_debug_log if len(notification_debug_log) else ["Notification logs are empty - no notifications sent yet."]) logs=notification_debug_log if len(notification_debug_log) else ["Notification logs are empty - no notifications sent yet."])
@ -1258,7 +1252,7 @@ def changedetection_app(config=None, datastore_o=None):
# These files should be in our subdirectory # These files should be in our subdirectory
try: try:
# set nocache, set content-type # set nocache, set content-type
response = make_response(send_from_directory(os.path.join(datastore_o.datastore_path, filename), screenshot_filename)) response = make_response(send_from_directory(os.path.join(datastore.datastore_path, filename), screenshot_filename))
response.headers['Content-type'] = 'image/png' response.headers['Content-type'] = 'image/png'
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache' response.headers['Pragma'] = 'no-cache'
@ -1278,7 +1272,7 @@ def changedetection_app(config=None, datastore_o=None):
try: try:
# set nocache, set content-type, # set nocache, set content-type,
# `filename` is actually directory UUID of the watch # `filename` is actually directory UUID of the watch
watch_directory = str(os.path.join(datastore_o.datastore_path, filename)) watch_directory = str(os.path.join(datastore.datastore_path, filename))
response = None response = None
if os.path.isfile(os.path.join(watch_directory, "elements.deflate")): if os.path.isfile(os.path.join(watch_directory, "elements.deflate")):
response = make_response(send_from_directory(watch_directory, "elements.deflate")) response = make_response(send_from_directory(watch_directory, "elements.deflate"))
@ -1338,7 +1332,6 @@ def changedetection_app(config=None, datastore_o=None):
from .processors.text_json_diff import prepare_filter_prevew from .processors.text_json_diff import prepare_filter_prevew
return prepare_filter_prevew(watch_uuid=uuid, datastore=datastore) return prepare_filter_prevew(watch_uuid=uuid, datastore=datastore)
@app.route("/form/add/quickwatch", methods=['POST']) @app.route("/form/add/quickwatch", methods=['POST'])
@login_optionally_required @login_optionally_required
def form_quick_watch_add(): def form_quick_watch_add():
@ -1369,8 +1362,6 @@ def changedetection_app(config=None, datastore_o=None):
return redirect(url_for('index')) return redirect(url_for('index'))
@app.route("/api/delete", methods=['GET']) @app.route("/api/delete", methods=['GET'])
@login_optionally_required @login_optionally_required
def form_delete(): def form_delete():
@ -1639,25 +1630,27 @@ def changedetection_app(config=None, datastore_o=None):
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() threading.Thread(target=ticker_thread_check_time_launch_checks, args=(app,)).start()
threading.Thread(target=notification_runner).start() threading.Thread(target=notification_runner, args=(app,)).start()
# Check for new release version, but not when running in test/build or pytest # Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')): if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')):
threading.Thread(target=check_for_new_version).start() threading.Thread(target=check_for_new_version, args=(app,)).start()
return app return app
# Check for new version and anonymous stats # Check for new version and anonymous stats
def check_for_new_version(): def check_for_new_version(app, url="https://changedetection.io/check-ver.php", delay_time=86400):
import requests import requests
import urllib3 import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
datastore = app.config["DATASTORE"]
while not app.config.exit.is_set(): while not app.config.exit.is_set():
try: try:
r = requests.post("https://changedetection.io/check-ver.php", r = requests.post(url,
data={'version': __version__, data={'version': __version__,
'app_guid': datastore.data['app_guid'], 'app_guid': datastore.data['app_guid'],
'watch_count': len(datastore.data['watching']) 'watch_count': len(datastore.data['watching'])
@ -1674,11 +1667,10 @@ def check_for_new_version():
pass pass
# Check daily # Check daily
app.config.exit.wait(86400) app.config.exit.wait(delay_time)
def notification_runner(): def notification_runner(app):
global notification_debug_log
from datetime import datetime from datetime import datetime
import json import json
while not app.config.exit.is_set(): while not app.config.exit.is_set():
@ -1693,6 +1685,10 @@ def notification_runner():
now = datetime.now() now = datetime.now()
sent_obj = None sent_obj = None
notification_debug_log = app.config["notification_debug_log"]
datastore = app.config["DATASTORE"]
try: try:
from changedetectionio import notification from changedetectionio import notification
# Fallback to system config if not set # Fallback to system config if not set
@ -1724,9 +1720,12 @@ def notification_runner():
notification_debug_log = notification_debug_log[-100:] notification_debug_log = notification_debug_log[-100:]
# Threaded runner, look for new watches to feed into the Queue. # Threaded runner, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks(): def ticker_thread_check_time_launch_checks(app):
import random import random
from changedetectionio import update_worker from changedetectionio import update_worker
datastore = app.config["DATASTORE"]
proxy_last_called_time = {} proxy_last_called_time = {}
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))

@ -1,7 +1,7 @@
import os import os
import uuid import uuid
from changedetectionio import strtobool from changedetectionio.strtobool import strtobool
from changedetectionio.notification import default_notification_format_for_watch from changedetectionio.notification import default_notification_format_for_watch
class watch_base(dict): class watch_base(dict):

@ -4,21 +4,17 @@ import time
from threading import Thread from threading import Thread
import pytest import pytest
from changedetectionio import changedetection_app from changedetectionio.flask_app import changedetection_app
from changedetectionio import store from changedetectionio import store
import os import os
import sys import sys
from loguru import logger from loguru import logger
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py # https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py
# Much better boilerplate than the docs # Much better boilerplate than the docs
# https://www.python-boilerplate.com/py3+flask+pytest/ # https://www.python-boilerplate.com/py3+flask+pytest/
global app
# https://loguru.readthedocs.io/en/latest/resources/migration.html#replacing-caplog-fixture-from-pytest-library
# Show loguru logs only if CICD pytest fails.
from loguru import logger
@pytest.fixture @pytest.fixture
def reportlog(pytestconfig): def reportlog(pytestconfig):
logging_plugin = pytestconfig.pluginmanager.getplugin("logging-plugin") logging_plugin = pytestconfig.pluginmanager.getplugin("logging-plugin")

@ -4,7 +4,7 @@ from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
import time import time
from .. import strtobool from changedetectionio.strtobool import strtobool
def test_setup(client, live_server, measure_memory_usage): def test_setup(client, live_server, measure_memory_usage):

Loading…
Cancel
Save