Merge pull request from GHSA-4r7v-whpg-8rx3

* CVE-2024-32651 - Security fix - Server Side Template Injection in Jinja2 allows Remote Command Execution

* use ImmutableSandboxedEnvironment also in validation
pull/2337/head
dgtlmoon 8 months ago committed by GitHub
parent 1ba29655f5
commit bd6eda696c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -59,6 +59,7 @@ jobs:
echo "run test with unittest" echo "run test with unittest"
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_notification_diff' docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_notification_diff'
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model' docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model'
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security'
# All tests # All tests
echo "run test with pytest" echo "run test with pytest"

@ -7,7 +7,7 @@ __version__ = '0.45.20'
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
import os import os
#os.environ['EVENTLET_NO_GREENDNS'] = 'yes' os.environ['EVENTLET_NO_GREENDNS'] = 'yes'
import eventlet import eventlet
import eventlet.wsgi import eventlet.wsgi
import getopt import getopt

@ -7,6 +7,7 @@ from random import randint
from loguru import logger from loguru import logger
from changedetectionio.content_fetchers.base import manage_user_agent from changedetectionio.content_fetchers.base import manage_user_agent
from changedetectionio.safe_jinja import render as jinja_render
# Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end # Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end
# 0- off, 1- on # 0- off, 1- on
@ -64,14 +65,12 @@ class steppable_browser_interface():
action_handler = getattr(self, "action_" + call_action_name) action_handler = getattr(self, "action_" + call_action_name)
# Support for Jinja2 variables in the value and selector # Support for Jinja2 variables in the value and selector
from jinja2 import Environment
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
if selector and ('{%' in selector or '{{' in selector): if selector and ('{%' in selector or '{{' in selector):
selector = str(jinja2_env.from_string(selector).render()) selector = jinja_render(template_str=selector)
if optional_value and ('{%' in optional_value or '{{' in optional_value): if optional_value and ('{%' in optional_value or '{{' in optional_value):
optional_value = str(jinja2_env.from_string(optional_value).render()) optional_value = jinja_render(template_str=optional_value)
action_handler(selector, optional_value) action_handler(selector, optional_value)
self.page.wait_for_timeout(1.5 * 1000) self.page.wait_for_timeout(1.5 * 1000)

@ -31,9 +31,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
import time import time
from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions
from changedetectionio.processors import text_json_diff from changedetectionio.processors import text_json_diff
from changedetectionio.safe_jinja import render as jinja_render
status = {'status': '', 'length': 0, 'text': ''} status = {'status': '', 'length': 0, 'text': ''}
from jinja2 import Environment, BaseLoader
contents = '' contents = ''
now = time.time() now = time.time()
@ -64,7 +64,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
status.update({'status': 'OK', 'length': len(contents), 'text': ''}) status.update({'status': 'OK', 'length': len(contents), 'text': ''})
if status.get('text'): if status.get('text'):
status['text'] = Environment(loader=BaseLoader()).from_string('{{text|e}}').render({'text': status['text']}) # parse 'text' as text for safety
v = {'text': status['text']}
status['text'] = jinja_render(template_str='{{text|e}}', **v)
status['time'] = "{:.2f}s".format(time.time() - now) status['time'] = "{:.2f}s".format(time.time() - now)

@ -123,8 +123,7 @@ class Fetcher():
def iterate_browser_steps(self): def iterate_browser_steps(self):
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
from playwright._impl._errors import TimeoutError, Error from playwright._impl._errors import TimeoutError, Error
from jinja2 import Environment from changedetectionio.safe_jinja import render as jinja_render
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
step_n = 0 step_n = 0
@ -143,9 +142,9 @@ class Fetcher():
selector = step['selector'] selector = step['selector']
# Support for jinja2 template in step values, with date module added # Support for jinja2 template in step values, with date module added
if '{%' in step['optional_value'] or '{{' in step['optional_value']: if '{%' in step['optional_value'] or '{{' in step['optional_value']:
optional_value = str(jinja2_env.from_string(step['optional_value']).render()) optional_value = jinja_render(template_str=step['optional_value'])
if '{%' in step['selector'] or '{{' in step['selector']: if '{%' in step['selector'] or '{{' in step['selector']:
selector = str(jinja2_env.from_string(step['selector']).render()) selector = jinja_render(template_str=step['selector'])
getattr(interface, "call_action")(action_name=step['operation'], getattr(interface, "call_action")(action_name=step['operation'],
selector=selector, selector=selector,

@ -5,11 +5,11 @@ import os
import queue import queue
import threading import threading
import time import time
from copy import deepcopy from .safe_jinja import render as jinja_render
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from copy import deepcopy
from functools import wraps from functools import wraps
from threading import Event from threading import Event
import flask_login import flask_login
import pytz import pytz
import timeago import timeago
@ -319,8 +319,6 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/rss", methods=['GET']) @app.route("/rss", methods=['GET'])
def rss(): def rss():
from jinja2 import Environment, BaseLoader
jinja2_env = Environment(loader=BaseLoader)
now = time.time() now = time.time()
# Always requires token set # Always requires token set
app_rss_token = datastore.data['settings']['application'].get('rss_access_token') app_rss_token = datastore.data['settings']['application'].get('rss_access_token')
@ -388,7 +386,7 @@ def changedetection_app(config=None, datastore_o=None):
# @todo Make this configurable and also consider html-colored markup # @todo Make this configurable and also consider html-colored markup
# @todo User could decide if <link> goes to the diff page, or to the watch link # @todo User could decide if <link> goes to the diff page, or to the watch link
rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n" rss_template = "<html><body>\n<h4><a href=\"{{watch_url}}\">{{watch_title}}</a></h4>\n<p>{{html_diff}}</p>\n</body></html>\n"
content = jinja2_env.from_string(rss_template).render(watch_title=watch_title, html_diff=html_diff, watch_url=watch.link) content = jinja_render(template_str=rss_template, watch_title=watch_title, html_diff=html_diff, watch_url=watch.link)
fe.content(content=content, type='CDATA') fe.content(content=content, type='CDATA')

@ -236,21 +236,26 @@ class ValidateJinja2Template(object):
def __call__(self, form, field): def __call__(self, form, field):
from changedetectionio import notification from changedetectionio import notification
from jinja2 import Environment, BaseLoader, TemplateSyntaxError, UndefinedError from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
from jinja2.sandbox import ImmutableSandboxedEnvironment
from jinja2.meta import find_undeclared_variables from jinja2.meta import find_undeclared_variables
import jinja2.exceptions
# Might be a list of text, or might be just text (like from the apprise url list)
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
try: try:
jinja2_env = Environment(loader=BaseLoader) jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader)
jinja2_env.globals.update(notification.valid_tokens) jinja2_env.globals.update(notification.valid_tokens)
jinja2_env.from_string(joined_data).render()
rendered = jinja2_env.from_string(field.data).render()
except TemplateSyntaxError as e: except TemplateSyntaxError as e:
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
except UndefinedError as e: except UndefinedError as e:
raise ValidationError(f"A variable or function is not defined: {e}") from e raise ValidationError(f"A variable or function is not defined: {e}") from e
except jinja2.exceptions.SecurityError as e:
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
ast = jinja2_env.parse(field.data) ast = jinja2_env.parse(joined_data)
undefined = ", ".join(find_undeclared_variables(ast)) undefined = ", ".join(find_undeclared_variables(ast))
if undefined: if undefined:
raise ValidationError( raise ValidationError(
@ -415,7 +420,7 @@ class quickWatchForm(Form):
# Common to a single watch and the global settings # Common to a single watch and the global settings
class commonSettingsForm(Form): class commonSettingsForm(Form):
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers()]) notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()]) notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()]) notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
notification_format = SelectField('Notification format', choices=valid_notification_formats.keys()) notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
@ -499,11 +504,9 @@ class watchForm(commonSettingsForm):
result = False result = False
# Attempt to validate jinja2 templates in the URL # Attempt to validate jinja2 templates in the URL
from jinja2 import Environment
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
try: try:
ready_url = str(jinja2_env.from_string(self.url.data).render()) from changedetectionio.safe_jinja import render as jinja_render
jinja_render(template_str=self.url.data)
except Exception as e: except Exception as e:
self.url.errors.append('Invalid template syntax') self.url.errors.append('Invalid template syntax')
result = False result = False

@ -1,4 +1,6 @@
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from changedetectionio.safe_jinja import render as jinja_render
import os import os
import re import re
import time import time
@ -137,12 +139,11 @@ class model(dict):
ready_url = url ready_url = url
if '{%' in url or '{{' in url: if '{%' in url or '{{' in url:
from jinja2 import Environment
# Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/ # Jinja2 available in URLs along with https://pypi.org/project/jinja2-time/
jinja2_env = Environment(extensions=['jinja2_time.TimeExtension'])
try: try:
ready_url = str(jinja2_env.from_string(url).render()) ready_url = jinja_render(template_str=url)
except Exception as e: except Exception as e:
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
from flask import ( from flask import (
flash, Markup, url_for flash, Markup, url_for
) )

@ -1,6 +1,5 @@
import apprise import apprise
import time import time
from jinja2 import Environment, BaseLoader
from apprise import NotifyFormat from apprise import NotifyFormat
import json import json
from loguru import logger from loguru import logger
@ -116,6 +115,7 @@ def apprise_custom_api_call_wrapper(body, title, notify_type, *args, **kwargs):
def process_notification(n_object, datastore): def process_notification(n_object, datastore):
from .safe_jinja import render as jinja_render
now = time.time() now = time.time()
if n_object.get('notification_timestamp'): if n_object.get('notification_timestamp'):
logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s") logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s")
@ -123,9 +123,9 @@ def process_notification(n_object, datastore):
notification_parameters = create_notification_parameters(n_object, datastore) notification_parameters = create_notification_parameters(n_object, datastore)
# Get the notification body from datastore # Get the notification body from datastore
jinja2_env = Environment(loader=BaseLoader) n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
n_body = jinja2_env.from_string(n_object.get('notification_body', '')).render(**notification_parameters) n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
n_title = jinja2_env.from_string(n_object.get('notification_title', '')).render(**notification_parameters)
n_format = valid_notification_formats.get( n_format = valid_notification_formats.get(
n_object.get('notification_format', default_notification_format), n_object.get('notification_format', default_notification_format),
valid_notification_formats[default_notification_format], valid_notification_formats[default_notification_format],
@ -157,7 +157,7 @@ def process_notification(n_object, datastore):
continue continue
logger.info(">> Process Notification: AppRise notifying {}".format(url)) logger.info(">> Process Notification: AppRise notifying {}".format(url))
url = jinja2_env.from_string(url).render(**notification_parameters) url = jinja_render(template_str=url, **notification_parameters)
# Re 323 - Limit discord length to their 2000 char limit total or it wont send. # Re 323 - Limit discord length to their 2000 char limit total or it wont send.
# Because different notifications may require different pre-processing, run each sequentially :( # Because different notifications may require different pre-processing, run each sequentially :(

@ -0,0 +1,18 @@
"""
Safe Jinja2 render with max payload sizes
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
"""
import jinja2.sandbox
import typing as t
import os
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
def render(template_str, **args: t.Any) -> str:
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension'])
output = jinja2_env.from_string(template_str).render(args)
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]

@ -169,7 +169,7 @@
<td> <td>
<a {% if watch.uuid in queued_uuids %}disabled="true"{% endif %} href="{{ url_for('form_watch_checknow', uuid=watch.uuid, tag=request.args.get('tag')) }}" <a {% if watch.uuid in queued_uuids %}disabled="true"{% endif %} href="{{ url_for('form_watch_checknow', uuid=watch.uuid, tag=request.args.get('tag')) }}"
class="recheck pure-button pure-button-primary">{% if watch.uuid in queued_uuids %}Queued{% else %}Recheck{% endif %}</a> class="recheck pure-button pure-button-primary">{% if watch.uuid in queued_uuids %}Queued{% else %}Recheck{% endif %}</a>
<a href="{{ url_for('edit_page', uuid=watch.uuid)}}" class="pure-button pure-button-primary">Edit</a> <a href="{{ url_for('edit_page', uuid=watch.uuid)}}#general" class="pure-button pure-button-primary">Edit</a>
{% if watch.history_n >= 2 %} {% if watch.history_n >= 2 %}
{% if is_unviewed %} {% if is_unviewed %}

@ -1,5 +1,5 @@
#!/usr/bin/python3 #!/usr/bin/python3
import os.path
import time import time
from flask import url_for from flask import url_for
from .util import live_server_setup, wait_for_all_checks from .util import live_server_setup, wait_for_all_checks
@ -107,7 +107,6 @@ def test_check_add_line_contains_trigger(client, live_server):
#live_server_setup(live_server) #live_server_setup(live_server)
# Give the endpoint time to spin up # Give the endpoint time to spin up
time.sleep(1)
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://') + "?xxx={{ watch_url }}" test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://') + "?xxx={{ watch_url }}"
res = client.post( res = client.post(
@ -166,6 +165,7 @@ def test_check_add_line_contains_trigger(client, live_server):
# Takes a moment for apprise to fire # Takes a moment for apprise to fire
time.sleep(3) time.sleep(3)
assert os.path.isfile("test-datastore/notification.txt"), "Notification fired because I can see the output file"
with open("test-datastore/notification.txt", 'r') as f: with open("test-datastore/notification.txt", 'r') as f:
response= f.read() response= f.read()
assert '-Oh yes please-' in response assert '-Oh yes please-' in response

@ -2,15 +2,15 @@
import time import time
from flask import url_for from flask import url_for
from .util import live_server_setup from .util import live_server_setup, wait_for_all_checks
# If there was only a change in the whitespacing, then we shouldnt have a change detected def test_setup(client, live_server):
def test_jinja2_in_url_query(client, live_server):
live_server_setup(live_server) live_server_setup(live_server)
# Give the endpoint time to spin up # If there was only a change in the whitespacing, then we shouldnt have a change detected
time.sleep(1) def test_jinja2_in_url_query(client, live_server):
#live_server_setup(live_server)
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_return_query', _external=True) test_url = url_for('test_return_query', _external=True)
@ -24,10 +24,35 @@ def test_jinja2_in_url_query(client, live_server):
follow_redirects=True follow_redirects=True
) )
assert b"Watch added" in res.data assert b"Watch added" in res.data
time.sleep(3) wait_for_all_checks(client)
# It should report nothing found (no new 'unviewed' class) # It should report nothing found (no new 'unviewed' class)
res = client.get( res = client.get(
url_for("preview_page", uuid="first"), url_for("preview_page", uuid="first"),
follow_redirects=True follow_redirects=True
) )
assert b'date=2' in res.data assert b'date=2' in res.data
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
def test_jinja2_security_url_query(client, live_server):
#live_server_setup(live_server)
# Add our URL to the import page
test_url = url_for('test_return_query', _external=True)
# because url_for() will URL-encode the var, but we dont here
full_url = "{}?{}".format(test_url,
"date={{ ''.__class__.__mro__[1].__subclasses__()}}", )
res = client.post(
url_for("form_quick_watch_add"),
data={"url": full_url, "tags": "test"},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'is invalid and cannot be used' in res.data
# Some of the spewed output from the subclasses
assert b'dict_values' not in res.data

@ -0,0 +1,57 @@
#!/usr/bin/python3
# run from dir above changedetectionio/ dir
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
import unittest
from changedetectionio import safe_jinja
# mostly
class TestJinja2SSTI(unittest.TestCase):
def test_exception(self):
import jinja2
# Where sandbox should kick in
attempt_list = [
"My name is {{ self.__init__.__globals__.__builtins__.__import__('os').system('id') }}",
"{{ self._TemplateReference__context.cycler.__init__.__globals__.os }}",
"{{ self.__init__.__globals__.__builtins__.__import__('os').popen('id').read() }}",
"{{cycler.__init__.__globals__.os.popen('id').read()}}",
"{{joiner.__init__.__globals__.os.popen('id').read()}}",
"{{namespace.__init__.__globals__.os.popen('id').read()}}",
"{{ ''.__class__.__mro__[2].__subclasses__()[40]('/tmp/hello.txt', 'w').write('Hello here !') }}",
"My name is {{ self.__init__.__globals__ }}",
"{{ dict.__base__.__subclasses__() }}"
]
for attempt in attempt_list:
with self.assertRaises(jinja2.exceptions.SecurityError):
safe_jinja.render(attempt)
def test_exception_debug_calls(self):
import jinja2
# Where sandbox should kick in - configs and debug calls
attempt_list = [
"{% debug %}",
]
for attempt in attempt_list:
# Usually should be something like 'Encountered unknown tag 'debug'.'
with self.assertRaises(jinja2.exceptions.TemplateSyntaxError):
safe_jinja.render(attempt)
# https://book.hacktricks.xyz/pentesting-web/ssti-server-side-template-injection/jinja2-ssti#accessing-global-objects
def test_exception_empty_calls(self):
import jinja2
attempt_list = [
"{{config}}",
"{{ debug }}"
"{{[].__class__}}",
]
for attempt in attempt_list:
self.assertEqual(len(safe_jinja.render(attempt)), 0, f"string test '{attempt}' is correctly empty")
if __name__ == '__main__':
unittest.main()

@ -116,7 +116,7 @@ def extract_UUID_from_client(client):
) )
# <span id="api-key">{{api_key}}</span> # <span id="api-key">{{api_key}}</span>
m = re.search('edit/(.+?)"', str(res.data)) m = re.search('edit/(.+?)[#"]', str(res.data))
uuid = m.group(1) uuid = m.group(1)
return uuid.strip() return uuid.strip()

Loading…
Cancel
Save