diff --git a/.github/workflows/test-only.yml b/.github/workflows/test-only.yml index 8fb89d62..5483e6f6 100644 --- a/.github/workflows/test-only.yml +++ b/.github/workflows/test-only.yml @@ -59,6 +59,7 @@ jobs: echo "run test with unittest" docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_notification_diff' docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model' + docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security' # All tests echo "run test with pytest" diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index b46c7a3b..75de2d0e 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -7,7 +7,7 @@ __version__ = '0.45.20' from changedetectionio.strtobool import strtobool from json.decoder import JSONDecodeError import os -#os.environ['EVENTLET_NO_GREENDNS'] = 'yes' +os.environ['EVENTLET_NO_GREENDNS'] = 'yes' import eventlet import eventlet.wsgi import getopt diff --git a/changedetectionio/blueprint/browser_steps/browser_steps.py b/changedetectionio/blueprint/browser_steps/browser_steps.py index 6bb58b38..6aac2446 100644 --- a/changedetectionio/blueprint/browser_steps/browser_steps.py +++ b/changedetectionio/blueprint/browser_steps/browser_steps.py @@ -7,6 +7,7 @@ from random import randint from loguru import logger from changedetectionio.content_fetchers.base import manage_user_agent +from changedetectionio.safe_jinja import render as jinja_render # Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end # 0- off, 1- on @@ -64,14 +65,12 @@ class steppable_browser_interface(): action_handler = getattr(self, "action_" + call_action_name) # Support for Jinja2 variables in the value and selector - from jinja2 import Environment - jinja2_env = Environment(extensions=['jinja2_time.TimeExtension']) if selector and ('{%' in selector or '{{' in selector): - selector = str(jinja2_env.from_string(selector).render()) + selector = jinja_render(template_str=selector) if optional_value and ('{%' in optional_value or '{{' in optional_value): - optional_value = str(jinja2_env.from_string(optional_value).render()) + optional_value = jinja_render(template_str=optional_value) action_handler(selector, optional_value) self.page.wait_for_timeout(1.5 * 1000) diff --git a/changedetectionio/blueprint/check_proxies/__init__.py b/changedetectionio/blueprint/check_proxies/__init__.py index db4bbe62..62a7dab3 100644 --- a/changedetectionio/blueprint/check_proxies/__init__.py +++ b/changedetectionio/blueprint/check_proxies/__init__.py @@ -31,9 +31,9 @@ def construct_blueprint(datastore: ChangeDetectionStore): import time from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions from changedetectionio.processors import text_json_diff + from changedetectionio.safe_jinja import render as jinja_render status = {'status': '', 'length': 0, 'text': ''} - from jinja2 import Environment, BaseLoader contents = '' now = time.time() @@ -64,7 +64,9 @@ def construct_blueprint(datastore: ChangeDetectionStore): status.update({'status': 'OK', 'length': len(contents), 'text': ''}) if status.get('text'): - status['text'] = Environment(loader=BaseLoader()).from_string('{{text|e}}').render({'text': status['text']}) + # parse 'text' as text for safety + v = {'text': status['text']} + status['text'] = jinja_render(template_str='{{text|e}}', **v) status['time'] = "{:.2f}s".format(time.time() - now) diff --git a/changedetectionio/content_fetchers/base.py b/changedetectionio/content_fetchers/base.py index 756a9bef..ca2fd019 100644 --- a/changedetectionio/content_fetchers/base.py +++ b/changedetectionio/content_fetchers/base.py @@ -123,8 +123,7 @@ class Fetcher(): def iterate_browser_steps(self): from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface from playwright._impl._errors import TimeoutError, Error - from jinja2 import Environment - jinja2_env = Environment(extensions=['jinja2_time.TimeExtension']) + from changedetectionio.safe_jinja import render as jinja_render step_n = 0 @@ -143,9 +142,9 @@ class Fetcher(): selector = step['selector'] # Support for jinja2 template in step values, with date module added if '{%' in step['optional_value'] or '{{' in step['optional_value']: - optional_value = str(jinja2_env.from_string(step['optional_value']).render()) + optional_value = jinja_render(template_str=step['optional_value']) if '{%' in step['selector'] or '{{' in step['selector']: - selector = str(jinja2_env.from_string(step['selector']).render()) + selector = jinja_render(template_str=step['selector']) getattr(interface, "call_action")(action_name=step['operation'], selector=selector, diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 9fb61664..9bd48fa6 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -5,11 +5,11 @@ import os import queue import threading import time -from copy import deepcopy +from .safe_jinja import render as jinja_render from changedetectionio.strtobool import strtobool +from copy import deepcopy from functools import wraps from threading import Event - import flask_login import pytz import timeago @@ -319,8 +319,6 @@ def changedetection_app(config=None, datastore_o=None): @app.route("/rss", methods=['GET']) def rss(): - from jinja2 import Environment, BaseLoader - jinja2_env = Environment(loader=BaseLoader) now = time.time() # Always requires token set app_rss_token = datastore.data['settings']['application'].get('rss_access_token') @@ -388,7 +386,7 @@ def changedetection_app(config=None, datastore_o=None): # @todo Make this configurable and also consider html-colored markup # @todo User could decide if goes to the diff page, or to the watch link rss_template = "\n

{{watch_title}}

\n

{{html_diff}}

\n\n" - content = jinja2_env.from_string(rss_template).render(watch_title=watch_title, html_diff=html_diff, watch_url=watch.link) + content = jinja_render(template_str=rss_template, watch_title=watch_title, html_diff=html_diff, watch_url=watch.link) fe.content(content=content, type='CDATA') diff --git a/changedetectionio/forms.py b/changedetectionio/forms.py index 4d408c61..4f74f978 100644 --- a/changedetectionio/forms.py +++ b/changedetectionio/forms.py @@ -236,21 +236,26 @@ class ValidateJinja2Template(object): def __call__(self, form, field): from changedetectionio import notification - from jinja2 import Environment, BaseLoader, TemplateSyntaxError, UndefinedError + from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError + from jinja2.sandbox import ImmutableSandboxedEnvironment from jinja2.meta import find_undeclared_variables + import jinja2.exceptions + # Might be a list of text, or might be just text (like from the apprise url list) + joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}" try: - jinja2_env = Environment(loader=BaseLoader) + jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader) jinja2_env.globals.update(notification.valid_tokens) - - rendered = jinja2_env.from_string(field.data).render() + jinja2_env.from_string(joined_data).render() except TemplateSyntaxError as e: raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e except UndefinedError as e: raise ValidationError(f"A variable or function is not defined: {e}") from e + except jinja2.exceptions.SecurityError as e: + raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e - ast = jinja2_env.parse(field.data) + ast = jinja2_env.parse(joined_data) undefined = ", ".join(find_undeclared_variables(ast)) if undefined: raise ValidationError( @@ -415,7 +420,7 @@ class quickWatchForm(Form): # Common to a single watch and the global settings class commonSettingsForm(Form): - notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers()]) + notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()]) notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()]) notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()]) notification_format = SelectField('Notification format', choices=valid_notification_formats.keys()) @@ -499,11 +504,9 @@ class watchForm(commonSettingsForm): result = False # Attempt to validate jinja2 templates in the URL - from jinja2 import Environment - # Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/ - jinja2_env = Environment(extensions=['jinja2_time.TimeExtension']) try: - ready_url = str(jinja2_env.from_string(self.url.data).render()) + from changedetectionio.safe_jinja import render as jinja_render + jinja_render(template_str=self.url.data) except Exception as e: self.url.errors.append('Invalid template syntax') result = False diff --git a/changedetectionio/model/Watch.py b/changedetectionio/model/Watch.py index b880b916..deffe9e5 100644 --- a/changedetectionio/model/Watch.py +++ b/changedetectionio/model/Watch.py @@ -1,4 +1,6 @@ from changedetectionio.strtobool import strtobool +from changedetectionio.safe_jinja import render as jinja_render + import os import re import time @@ -137,12 +139,11 @@ class model(dict): ready_url = url if '{%' in url or '{{' in url: - from jinja2 import Environment # Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/ - jinja2_env = Environment(extensions=['jinja2_time.TimeExtension']) try: - ready_url = str(jinja2_env.from_string(url).render()) + ready_url = jinja_render(template_str=url) except Exception as e: + logger.critical(f"Invalid URL template for: '{url}' - {str(e)}") from flask import ( flash, Markup, url_for ) diff --git a/changedetectionio/notification.py b/changedetectionio/notification.py index 21bda720..f634a1af 100644 --- a/changedetectionio/notification.py +++ b/changedetectionio/notification.py @@ -1,6 +1,5 @@ import apprise import time -from jinja2 import Environment, BaseLoader from apprise import NotifyFormat import json from loguru import logger @@ -116,6 +115,7 @@ def apprise_custom_api_call_wrapper(body, title, notify_type, *args, **kwargs): def process_notification(n_object, datastore): + from .safe_jinja import render as jinja_render now = time.time() if n_object.get('notification_timestamp'): logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s") @@ -123,9 +123,9 @@ def process_notification(n_object, datastore): notification_parameters = create_notification_parameters(n_object, datastore) # Get the notification body from datastore - jinja2_env = Environment(loader=BaseLoader) - n_body = jinja2_env.from_string(n_object.get('notification_body', '')).render(**notification_parameters) - n_title = jinja2_env.from_string(n_object.get('notification_title', '')).render(**notification_parameters) + n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters) + n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters) + n_format = valid_notification_formats.get( n_object.get('notification_format', default_notification_format), valid_notification_formats[default_notification_format], @@ -157,7 +157,7 @@ def process_notification(n_object, datastore): continue logger.info(">> Process Notification: AppRise notifying {}".format(url)) - url = jinja2_env.from_string(url).render(**notification_parameters) + url = jinja_render(template_str=url, **notification_parameters) # Re 323 - Limit discord length to their 2000 char limit total or it wont send. # Because different notifications may require different pre-processing, run each sequentially :( diff --git a/changedetectionio/safe_jinja.py b/changedetectionio/safe_jinja.py new file mode 100644 index 00000000..8a6e1d38 --- /dev/null +++ b/changedetectionio/safe_jinja.py @@ -0,0 +1,18 @@ +""" +Safe Jinja2 render with max payload sizes + +See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations +""" + +import jinja2.sandbox +import typing as t +import os + +JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10)) + + +def render(template_str, **args: t.Any) -> str: + jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension']) + output = jinja2_env.from_string(template_str).render(args) + return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE] + diff --git a/changedetectionio/templates/watch-overview.html b/changedetectionio/templates/watch-overview.html index 61797b67..03cc21ca 100644 --- a/changedetectionio/templates/watch-overview.html +++ b/changedetectionio/templates/watch-overview.html @@ -169,7 +169,7 @@ {% if watch.uuid in queued_uuids %}Queued{% else %}Recheck{% endif %} - Edit + Edit {% if watch.history_n >= 2 %} {% if is_unviewed %} diff --git a/changedetectionio/tests/test_add_replace_remove_filter.py b/changedetectionio/tests/test_add_replace_remove_filter.py index 4ad9ecf8..f64d877b 100644 --- a/changedetectionio/tests/test_add_replace_remove_filter.py +++ b/changedetectionio/tests/test_add_replace_remove_filter.py @@ -1,5 +1,5 @@ #!/usr/bin/python3 - +import os.path import time from flask import url_for from .util import live_server_setup, wait_for_all_checks @@ -107,7 +107,6 @@ def test_check_add_line_contains_trigger(client, live_server): #live_server_setup(live_server) # Give the endpoint time to spin up - time.sleep(1) test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://') + "?xxx={{ watch_url }}" res = client.post( @@ -166,6 +165,7 @@ def test_check_add_line_contains_trigger(client, live_server): # Takes a moment for apprise to fire time.sleep(3) + assert os.path.isfile("test-datastore/notification.txt"), "Notification fired because I can see the output file" with open("test-datastore/notification.txt", 'r') as f: response= f.read() assert '-Oh yes please-' in response diff --git a/changedetectionio/tests/test_jinja2.py b/changedetectionio/tests/test_jinja2.py index 771dc5ff..1e08691b 100644 --- a/changedetectionio/tests/test_jinja2.py +++ b/changedetectionio/tests/test_jinja2.py @@ -2,15 +2,15 @@ import time from flask import url_for -from .util import live_server_setup +from .util import live_server_setup, wait_for_all_checks -# If there was only a change in the whitespacing, then we shouldnt have a change detected -def test_jinja2_in_url_query(client, live_server): +def test_setup(client, live_server): live_server_setup(live_server) - # Give the endpoint time to spin up - time.sleep(1) +# If there was only a change in the whitespacing, then we shouldnt have a change detected +def test_jinja2_in_url_query(client, live_server): + #live_server_setup(live_server) # Add our URL to the import page test_url = url_for('test_return_query', _external=True) @@ -24,10 +24,35 @@ def test_jinja2_in_url_query(client, live_server): follow_redirects=True ) assert b"Watch added" in res.data - time.sleep(3) + wait_for_all_checks(client) + # It should report nothing found (no new 'unviewed' class) res = client.get( url_for("preview_page", uuid="first"), follow_redirects=True ) assert b'date=2' in res.data + +# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456 +def test_jinja2_security_url_query(client, live_server): + #live_server_setup(live_server) + + # Add our URL to the import page + test_url = url_for('test_return_query', _external=True) + + # because url_for() will URL-encode the var, but we dont here + full_url = "{}?{}".format(test_url, + "date={{ ''.__class__.__mro__[1].__subclasses__()}}", ) + res = client.post( + url_for("form_quick_watch_add"), + data={"url": full_url, "tags": "test"}, + follow_redirects=True + ) + assert b"Watch added" in res.data + wait_for_all_checks(client) + + # It should report nothing found (no new 'unviewed' class) + res = client.get(url_for("index")) + assert b'is invalid and cannot be used' in res.data + # Some of the spewed output from the subclasses + assert b'dict_values' not in res.data diff --git a/changedetectionio/tests/unit/test_jinja2_security.py b/changedetectionio/tests/unit/test_jinja2_security.py new file mode 100644 index 00000000..eb43db9d --- /dev/null +++ b/changedetectionio/tests/unit/test_jinja2_security.py @@ -0,0 +1,57 @@ +#!/usr/bin/python3 + +# run from dir above changedetectionio/ dir +# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security + +import unittest +from changedetectionio import safe_jinja + + +# mostly +class TestJinja2SSTI(unittest.TestCase): + + def test_exception(self): + import jinja2 + + # Where sandbox should kick in + attempt_list = [ + "My name is {{ self.__init__.__globals__.__builtins__.__import__('os').system('id') }}", + "{{ self._TemplateReference__context.cycler.__init__.__globals__.os }}", + "{{ self.__init__.__globals__.__builtins__.__import__('os').popen('id').read() }}", + "{{cycler.__init__.__globals__.os.popen('id').read()}}", + "{{joiner.__init__.__globals__.os.popen('id').read()}}", + "{{namespace.__init__.__globals__.os.popen('id').read()}}", + "{{ ''.__class__.__mro__[2].__subclasses__()[40]('/tmp/hello.txt', 'w').write('Hello here !') }}", + "My name is {{ self.__init__.__globals__ }}", + "{{ dict.__base__.__subclasses__() }}" + ] + for attempt in attempt_list: + with self.assertRaises(jinja2.exceptions.SecurityError): + safe_jinja.render(attempt) + + def test_exception_debug_calls(self): + import jinja2 + # Where sandbox should kick in - configs and debug calls + attempt_list = [ + "{% debug %}", + ] + for attempt in attempt_list: + # Usually should be something like 'Encountered unknown tag 'debug'.' + with self.assertRaises(jinja2.exceptions.TemplateSyntaxError): + safe_jinja.render(attempt) + + # https://book.hacktricks.xyz/pentesting-web/ssti-server-side-template-injection/jinja2-ssti#accessing-global-objects + def test_exception_empty_calls(self): + import jinja2 + attempt_list = [ + "{{config}}", + "{{ debug }}" + "{{[].__class__}}", + ] + for attempt in attempt_list: + self.assertEqual(len(safe_jinja.render(attempt)), 0, f"string test '{attempt}' is correctly empty") + + + +if __name__ == '__main__': + unittest.main() diff --git a/changedetectionio/tests/util.py b/changedetectionio/tests/util.py index aab79163..186fc736 100644 --- a/changedetectionio/tests/util.py +++ b/changedetectionio/tests/util.py @@ -116,7 +116,7 @@ def extract_UUID_from_client(client): ) # {{api_key}} - m = re.search('edit/(.+?)"', str(res.data)) + m = re.search('edit/(.+?)[#"]', str(res.data)) uuid = m.group(1) return uuid.strip()