Security - Possible stored XSS in watch list - Only permit HTTP/HTTP/FTP by default - override with env var `SAFE_PROTOCOL_REGEX` (#1359)

ticket-1365-service-worker-defaults
dgtlmoon 2 years ago committed by GitHub
parent d47a25eb6d
commit f8e587c415
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -74,7 +74,6 @@ app.config['TEMPLATES_AUTO_RELOAD'] = True
app.jinja_env.add_extension('jinja2.ext.loopcontrols') app.jinja_env.add_extension('jinja2.ext.loopcontrols')
csrf = CSRFProtect() csrf = CSRFProtect()
csrf.init_app(app) csrf.init_app(app)
notification_debug_log=[] notification_debug_log=[]
watch_api = Api(app, decorators=[csrf.exempt]) watch_api = Api(app, decorators=[csrf.exempt])
@ -586,6 +585,7 @@ def changedetection_app(config=None, datastore_o=None):
if request.method == 'POST' and form.validate(): if request.method == 'POST' and form.validate():
extra_update_obj = {} extra_update_obj = {}
if request.args.get('unpause_on_save'): if request.args.get('unpause_on_save'):
@ -1133,7 +1133,8 @@ def changedetection_app(config=None, datastore_o=None):
form = forms.quickWatchForm(request.form) form = forms.quickWatchForm(request.form)
if not form.validate(): if not form.validate():
flash("Error") for widget, l in form.errors.items():
flash(','.join(l), 'error')
return redirect(url_for('index')) return redirect(url_for('index'))
url = request.form.get('url').strip() url = request.form.get('url').strip()
@ -1144,15 +1145,14 @@ def changedetection_app(config=None, datastore_o=None):
add_paused = request.form.get('edit_and_watch_submit_button') != None add_paused = request.form.get('edit_and_watch_submit_button') != None
new_uuid = datastore.add_watch(url=url, tag=request.form.get('tag').strip(), extras={'paused': add_paused}) new_uuid = datastore.add_watch(url=url, tag=request.form.get('tag').strip(), extras={'paused': add_paused})
if new_uuid:
if not add_paused and new_uuid: if add_paused:
# Straight into the queue. flash('Watch added in Paused state, saving will unpause.')
update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) return redirect(url_for('edit_page', uuid=new_uuid, unpause_on_save=1))
flash("Watch added.") else:
# Straight into the queue.
if add_paused: update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
flash('Watch added in Paused state, saving will unpause.') flash("Watch added.")
return redirect(url_for('edit_page', uuid=new_uuid, unpause_on_save=1))
return redirect(url_for('index')) return redirect(url_for('index'))
@ -1184,8 +1184,9 @@ def changedetection_app(config=None, datastore_o=None):
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
new_uuid = datastore.clone(uuid) new_uuid = datastore.clone(uuid)
update_q.put(queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid, 'skip_when_checksum_same': True})) if new_uuid:
flash('Cloned.') update_q.put(queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
flash('Cloned.')
return redirect(url_for('index')) return redirect(url_for('index'))

@ -202,8 +202,11 @@ class CreateWatch(Resource):
del extras['url'] del extras['url']
new_uuid = self.datastore.add_watch(url=url, extras=extras) new_uuid = self.datastore.add_watch(url=url, extras=extras)
self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True})) if new_uuid:
return {'uuid': new_uuid}, 201 self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
return {'uuid': new_uuid}, 201
else:
return "Invalid or unsupported URL", 400
@auth.check_token @auth.check_token
def get(self): def get(self):

@ -232,12 +232,17 @@ class validateURL(object):
def __call__(self, form, field): def __call__(self, form, field):
import validators import validators
try: try:
validators.url(field.data.strip()) validators.url(field.data.strip())
except validators.ValidationFailure: except validators.ValidationFailure:
message = field.gettext('\'%s\' is not a valid URL.' % (field.data.strip())) message = field.gettext('\'%s\' is not a valid URL.' % (field.data.strip()))
raise ValidationError(message) raise ValidationError(message)
from .model.Watch import is_safe_url
if not is_safe_url(field.data):
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX')
class ValidateListRegex(object): class ValidateListRegex(object):
""" """

@ -1,9 +1,14 @@
from distutils.util import strtobool from distutils.util import strtobool
import logging import logging
import os import os
import re
import time import time
import uuid import uuid
# Allowable protocols, protects against javascript: etc
# file:// is further checked by ALLOW_FILE_URI
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 60)) minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 60))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
@ -55,6 +60,22 @@ base_config = {
'webdriver_js_execute_code': None, # Run before change-detection 'webdriver_js_execute_code': None, # Run before change-detection
} }
def is_safe_url(test_url):
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
# 'source:' is a valid way to tell us to return the source
r = re.compile(re.escape('source:'), re.IGNORECASE)
test_url = r.sub('', test_url)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
if not pattern.match(test_url.strip()):
return False
return True
class model(dict): class model(dict):
__newest_history_key = None __newest_history_key = None
__history_n = 0 __history_n = 0
@ -93,7 +114,11 @@ class model(dict):
@property @property
def link(self): def link(self):
url = self.get('url', '') url = self.get('url', '')
if not is_safe_url(url):
return 'DISABLED'
ready_url = url ready_url = url
if '{%' in url or '{{' in url: if '{%' in url or '{{' in url:
from jinja2 import Environment from jinja2 import Environment

@ -1,20 +1,20 @@
from flask import ( from flask import (
flash flash
) )
import json
import logging from . model import App, Watch
import os
import threading
import time
import uuid as uuid_builder
from copy import deepcopy from copy import deepcopy
from os import path, unlink from os import path, unlink
from threading import Lock from threading import Lock
import json
import logging
import os
import re import re
import requests import requests
import secrets import secrets
import threading
from . model import App, Watch import time
import uuid as uuid_builder
# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods? # Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods?
# Open a github issue if you know something :) # Open a github issue if you know something :)
@ -309,9 +309,12 @@ class ChangeDetectionStore:
logging.error("Error fetching metadata for shared watch link", url, str(e)) logging.error("Error fetching metadata for shared watch link", url, str(e))
flash("Error fetching metadata for {}".format(url), 'error') flash("Error fetching metadata for {}".format(url), 'error')
return False return False
from .model.Watch import is_safe_url
if not is_safe_url(url):
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
return None
with self.lock: with self.lock:
# #Re 569 # #Re 569
new_watch = Watch.model(datastore_path=self.datastore_path, default={ new_watch = Watch.model(datastore_path=self.datastore_path, default={
'url': url, 'url': url,

@ -2,11 +2,9 @@ from flask import url_for
from . util import set_original_response, set_modified_response, live_server_setup from . util import set_original_response, set_modified_response, live_server_setup
import time import time
def test_setup(live_server):
live_server_setup(live_server)
def test_file_access(client, live_server):
def test_bad_access(client, live_server):
live_server_setup(live_server)
res = client.post( res = client.post(
url_for("import_page"), url_for("import_page"),
data={"urls": 'https://localhost'}, data={"urls": 'https://localhost'},
@ -19,18 +17,49 @@ def test_file_access(client, live_server):
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={ data={
"url": 'file:///etc/passwd', "url": 'javascript:alert(document.domain)',
"tag": "", "tag": "",
"method": "GET", "method": "GET",
"fetch_backend": "html_requests", "fetch_backend": "html_requests",
"body": ""}, "body": ""},
follow_redirects=True follow_redirects=True
) )
time.sleep(3)
res = client.get( assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
url_for("index", uuid="first"),
res = client.post(
url_for("form_quick_watch_add"),
data={"url": ' javascript:alert(123)', "tag": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
res = client.post(
url_for("form_quick_watch_add"),
data={"url": '%20%20%20javascript:alert(123)%20%20', "tag": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
res = client.post(
url_for("form_quick_watch_add"),
data={"url": ' source:javascript:alert(document.domain)', "tag": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
# file:// is permitted by default, but it will be caught by ALLOW_FILE_URI
client.post(
url_for("form_quick_watch_add"),
data={"url": 'file:///tasty/disk/drive', "tag": ''},
follow_redirects=True follow_redirects=True
) )
time.sleep(1)
res = client.get(url_for("index"))
assert b'denied for security reasons' in res.data assert b'file:// type access is denied for security reasons.' in res.data
Loading…
Cancel
Save