You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
229 lines
8.3 KiB
229 lines
8.3 KiB
from wtforms import Form, SelectField, RadioField, BooleanField, StringField, PasswordField, validators, IntegerField, fields, TextAreaField, \
|
|
Field
|
|
from wtforms import widgets
|
|
from wtforms.validators import ValidationError
|
|
from wtforms.fields import html5
|
|
from changedetectionio import content_fetcher
|
|
import re
|
|
|
|
class StringListField(StringField):
|
|
widget = widgets.TextArea()
|
|
|
|
def _value(self):
|
|
if self.data:
|
|
return "\r\n".join(self.data)
|
|
else:
|
|
return u''
|
|
|
|
# incoming
|
|
def process_formdata(self, valuelist):
|
|
if valuelist:
|
|
# Remove empty strings
|
|
cleaned = list(filter(None, valuelist[0].split("\n")))
|
|
self.data = [x.strip() for x in cleaned]
|
|
p = 1
|
|
else:
|
|
self.data = []
|
|
|
|
|
|
|
|
class SaltyPasswordField(StringField):
|
|
widget = widgets.PasswordInput()
|
|
encrypted_password = ""
|
|
|
|
def build_password(self, password):
|
|
import hashlib
|
|
import base64
|
|
import secrets
|
|
|
|
# Make a new salt on every new password and store it with the password
|
|
salt = secrets.token_bytes(32)
|
|
|
|
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
|
store = base64.b64encode(salt + key).decode('ascii')
|
|
|
|
return store
|
|
|
|
# incoming
|
|
def process_formdata(self, valuelist):
|
|
if valuelist:
|
|
# Be really sure it's non-zero in length
|
|
if len(valuelist[0].strip()) > 0:
|
|
self.encrypted_password = self.build_password(valuelist[0])
|
|
self.data = ""
|
|
else:
|
|
self.data = False
|
|
|
|
|
|
# Separated by key:value
|
|
class StringDictKeyValue(StringField):
|
|
widget = widgets.TextArea()
|
|
|
|
def _value(self):
|
|
if self.data:
|
|
output = u''
|
|
for k in self.data.keys():
|
|
output += "{}: {}\r\n".format(k, self.data[k])
|
|
|
|
return output
|
|
else:
|
|
return u''
|
|
|
|
# incoming
|
|
def process_formdata(self, valuelist):
|
|
if valuelist:
|
|
self.data = {}
|
|
# Remove empty strings
|
|
cleaned = list(filter(None, valuelist[0].split("\n")))
|
|
for s in cleaned:
|
|
parts = s.strip().split(':', 1)
|
|
if len(parts) == 2:
|
|
self.data.update({parts[0].strip(): parts[1].strip()})
|
|
|
|
else:
|
|
self.data = {}
|
|
|
|
class ValidateContentFetcherIsReady(object):
|
|
"""
|
|
Validates that anything that looks like a regex passes as a regex
|
|
"""
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
from changedetectionio import content_fetcher
|
|
import urllib3.exceptions
|
|
|
|
# Better would be a radiohandler that keeps a reference to each class
|
|
if field.data is not None:
|
|
klass = getattr(content_fetcher, field.data)
|
|
some_object = klass()
|
|
try:
|
|
ready = some_object.is_ready()
|
|
|
|
except urllib3.exceptions.MaxRetryError as e:
|
|
driver_url = some_object.command_executor
|
|
message = field.gettext('Content fetcher \'%s\' did not respond.' % (field.data))
|
|
message += '<br/>'+field.gettext('Be sure that the selenium/webdriver runner is running and accessible via network from this container/host.')
|
|
message += '<br/>' + field.gettext('Did you follow the instructions in the wiki?')
|
|
message += '<br/><br/>' + field.gettext('WebDriver Host: %s' % (driver_url))
|
|
message += '<br/><a href="https://github.com/dgtlmoon/changedetection.io/wiki/Fetching-pages-with-WebDriver">Go here for more information</a>'
|
|
|
|
raise ValidationError(message)
|
|
|
|
except Exception as e:
|
|
message = field.gettext('Content fetcher \'%s\' did not respond properly, unable to use it.\n %s')
|
|
raise ValidationError(message % (field.data, e))
|
|
|
|
|
|
class ValidateAppRiseServers(object):
|
|
"""
|
|
Validates that each URL given is compatible with AppRise
|
|
"""
|
|
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
import apprise
|
|
apobj = apprise.Apprise()
|
|
|
|
for server_url in field.data:
|
|
if not apobj.add(server_url):
|
|
message = field.gettext('\'%s\' is not a valid AppRise URL.' % (server_url))
|
|
raise ValidationError(message)
|
|
|
|
class ValidateTokensList(object):
|
|
"""
|
|
Validates that a {token} is from a valid set
|
|
"""
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
from changedetectionio import notification
|
|
regex = re.compile('{.*?}')
|
|
for p in re.findall(regex, field.data):
|
|
if not p.strip('{}') in notification.valid_tokens:
|
|
message = field.gettext('Token \'%s\' is not a valid token.')
|
|
raise ValidationError(message % (p))
|
|
|
|
class ValidateListRegex(object):
|
|
"""
|
|
Validates that anything that looks like a regex passes as a regex
|
|
"""
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
|
|
for line in field.data:
|
|
if line[0] == '/' and line[-1] == '/':
|
|
# Because internally we dont wrap in /
|
|
line = line.strip('/')
|
|
try:
|
|
re.compile(line)
|
|
except re.error:
|
|
message = field.gettext('RegEx \'%s\' is not a valid regular expression.')
|
|
raise ValidationError(message % (line))
|
|
|
|
class ValidateCSSJSONInput(object):
|
|
"""
|
|
Filter validation
|
|
@todo CSS validator ;)
|
|
"""
|
|
|
|
def __init__(self, message=None):
|
|
self.message = message
|
|
|
|
def __call__(self, form, field):
|
|
if 'json:' in field.data:
|
|
from jsonpath_ng.exceptions import JsonPathParserError
|
|
from jsonpath_ng import jsonpath, parse
|
|
|
|
input = field.data.replace('json:', '')
|
|
|
|
try:
|
|
parse(input)
|
|
except JsonPathParserError as e:
|
|
message = field.gettext('\'%s\' is not a valid JSONPath expression. (%s)')
|
|
raise ValidationError(message % (input, str(e)))
|
|
|
|
|
|
class quickWatchForm(Form):
|
|
# https://wtforms.readthedocs.io/en/2.3.x/fields/#module-wtforms.fields.html5
|
|
# `require_tld` = False is needed even for the test harness "http://localhost:5005.." to run
|
|
url = html5.URLField('URL', [validators.URL(require_tld=False)])
|
|
tag = StringField('Group tag', [validators.Optional(), validators.Length(max=35)])
|
|
|
|
class commonSettingsForm(Form):
|
|
|
|
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers()])
|
|
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {watch_url}', validators=[validators.Optional(), ValidateTokensList()])
|
|
notification_body = TextAreaField('Notification Body', default='{watch_url} had a change.', validators=[validators.Optional(), ValidateTokensList()])
|
|
trigger_check = BooleanField('Send test notification on save')
|
|
fetch_backend = RadioField(u'Fetch Method', choices=content_fetcher.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
|
|
|
|
class watchForm(commonSettingsForm):
|
|
|
|
url = html5.URLField('URL', [validators.URL(require_tld=False)])
|
|
tag = StringField('Group tag', [validators.Optional(), validators.Length(max=35)])
|
|
|
|
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
|
|
[validators.Optional(), validators.NumberRange(min=1)])
|
|
css_filter = StringField('CSS/JSON Filter', [ValidateCSSJSONInput()])
|
|
title = StringField('Title')
|
|
|
|
ignore_text = StringListField('Ignore Text', [ValidateListRegex()])
|
|
headers = StringDictKeyValue('Request Headers')
|
|
trigger_text = StringListField('Trigger/wait for text', [validators.Optional(), ValidateListRegex()])
|
|
|
|
|
|
class globalSettingsForm(commonSettingsForm):
|
|
|
|
password = SaltyPasswordField()
|
|
|
|
minutes_between_check = html5.IntegerField('Maximum time in minutes until recheck',
|
|
[validators.NumberRange(min=1)])
|
|
extract_title_as_title = BooleanField('Extract <title> from document and use as watch title')
|