Use deepcopy to stop possible data corruption (#1108)

bugfix-hanging-when-deleted
dgtlmoon 2 years ago committed by GitHub
parent 06bcfb28e5
commit b043d477dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,7 +15,6 @@ class FilterNotFoundInResponse(ValueError):
ValueError.__init__(self, msg) ValueError.__init__(self, msg)
# Some common stuff here that can be moved to a base class # Some common stuff here that can be moved to a base class
# (set_proxy_from_list) # (set_proxy_from_list)
class perform_site_check(): class perform_site_check():
@ -39,18 +38,20 @@ class perform_site_check():
return regex return regex
def run(self, uuid): def run(self, uuid):
from copy import deepcopy
changed_detected = False changed_detected = False
screenshot = False # as bytes screenshot = False # as bytes
stripped_text_from_html = "" stripped_text_from_html = ""
watch = self.datastore.data['watching'].get(uuid) # DeepCopy so we can be sure we don't accidently change anything by reference
watch = deepcopy(self.datastore.data['watching'].get(uuid))
if not watch: if not watch:
return return
# Protect against file:// access # Protect against file:// access
if re.search(r'^file', watch['url'], re.IGNORECASE) and not os.getenv('ALLOW_FILE_URI', False): if re.search(r'^file', watch.get('url', ''), re.IGNORECASE) and not os.getenv('ALLOW_FILE_URI', False):
raise Exception( raise Exception(
"file:// type access is denied for security reasons." "file:// type access is denied for security reasons."
) )
@ -58,10 +59,10 @@ class perform_site_check():
# Unset any existing notification error # Unset any existing notification error
update_obj = {'last_notification_error': False, 'last_error': False} update_obj = {'last_notification_error': False, 'last_error': False}
extra_headers =self.datastore.data['watching'][uuid].get('headers') extra_headers = watch.get('headers', [])
# Tweak the base config with the per-watch ones # Tweak the base config with the per-watch ones
request_headers = self.datastore.data['settings']['headers'].copy() request_headers = deepcopy(self.datastore.data['settings']['headers'])
request_headers.update(extra_headers) request_headers.update(extra_headers)
# https://github.com/psf/requests/issues/4525 # https://github.com/psf/requests/issues/4525
@ -85,7 +86,7 @@ class perform_site_check():
is_source = True is_source = True
# Pluggable content fetcher # Pluggable content fetcher
prefer_backend = watch['fetch_backend'] prefer_backend = watch.get('fetch_backend')
if hasattr(content_fetcher, prefer_backend): if hasattr(content_fetcher, prefer_backend):
klass = getattr(content_fetcher, prefer_backend) klass = getattr(content_fetcher, prefer_backend)
else: else:
@ -96,21 +97,21 @@ class perform_site_check():
proxy_url = None proxy_url = None
if proxy_id: if proxy_id:
proxy_url = self.datastore.proxy_list.get(proxy_id).get('url') proxy_url = self.datastore.proxy_list.get(proxy_id).get('url')
print ("UUID {} Using proxy {}".format(uuid, proxy_url)) print("UUID {} Using proxy {}".format(uuid, proxy_url))
fetcher = klass(proxy_override=proxy_url) fetcher = klass(proxy_override=proxy_url)
# Configurable per-watch or global extra delay before extracting text (for webDriver types) # Configurable per-watch or global extra delay before extracting text (for webDriver types)
system_webdriver_delay = self.datastore.data['settings']['application'].get('webdriver_delay', None) system_webdriver_delay = self.datastore.data['settings']['application'].get('webdriver_delay', None)
if watch['webdriver_delay'] is not None: if watch['webdriver_delay'] is not None:
fetcher.render_extract_delay = watch['webdriver_delay'] fetcher.render_extract_delay = watch.get('webdriver_delay')
elif system_webdriver_delay is not None: elif system_webdriver_delay is not None:
fetcher.render_extract_delay = system_webdriver_delay fetcher.render_extract_delay = system_webdriver_delay
if watch['webdriver_js_execute_code'] is not None and watch['webdriver_js_execute_code'].strip(): if watch.get('webdriver_js_execute_code') is not None and watch.get('webdriver_js_execute_code').strip():
fetcher.webdriver_js_execute_code = watch['webdriver_js_execute_code'] fetcher.webdriver_js_execute_code = watch.get('webdriver_js_execute_code')
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch['include_filters']) fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch.get('include_filters'))
fetcher.quit() fetcher.quit()
self.screenshot = fetcher.screenshot self.screenshot = fetcher.screenshot
@ -135,7 +136,7 @@ class perform_site_check():
is_json = False is_json = False
include_filters_rule = watch.get('include_filters', []) include_filters_rule = watch.get('include_filters', [])
#include_filters_rule = watch['include_filters'] # include_filters_rule = watch['include_filters']
subtractive_selectors = watch.get( subtractive_selectors = watch.get(
"subtractive_selectors", [] "subtractive_selectors", []
) + self.datastore.data["settings"]["application"].get( ) + self.datastore.data["settings"]["application"].get(
@ -179,8 +180,8 @@ class perform_site_check():
else: else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content += html_tools.include_filters(include_filters=filter_rule, html_content += html_tools.include_filters(include_filters=filter_rule,
html_content=fetcher.content, html_content=fetcher.content,
append_pretty_line_formatting=not is_source) append_pretty_line_formatting=not is_source)
if not html_content.strip(): if not html_content.strip():
raise FilterNotFoundInResponse(include_filters_rule) raise FilterNotFoundInResponse(include_filters_rule)
@ -192,12 +193,11 @@ class perform_site_check():
stripped_text_from_html = html_content stripped_text_from_html = html_content
else: else:
# extract text # extract text
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
stripped_text_from_html = \ stripped_text_from_html = \
html_tools.html_to_text( html_tools.html_to_text(
html_content, html_content,
render_anchor_tag_content=self.datastore.data["settings"][ render_anchor_tag_content=do_anchor
"application"].get(
"render_anchor_tag_content", False)
) )
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied
@ -232,7 +232,7 @@ class perform_site_check():
for l in result: for l in result:
if type(l) is tuple: if type(l) is tuple:
#@todo - some formatter option default (between groups) # @todo - some formatter option default (between groups)
regex_matched_output += list(l) + [b'\n'] regex_matched_output += list(l) + [b'\n']
else: else:
# @todo - some formatter option default (between each ungrouped result) # @todo - some formatter option default (between each ungrouped result)
@ -246,7 +246,6 @@ class perform_site_check():
stripped_text_from_html = b''.join(regex_matched_output) stripped_text_from_html = b''.join(regex_matched_output)
text_content_before_ignored_filter = stripped_text_from_html text_content_before_ignored_filter = stripped_text_from_html
# Re #133 - if we should strip whitespaces from triggering the change detected comparison # Re #133 - if we should strip whitespaces from triggering the change detected comparison
if self.datastore.data['settings']['application'].get('ignore_whitespace', False): if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest() fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
@ -256,29 +255,30 @@ class perform_site_check():
############ Blocking rules, after checksum ################# ############ Blocking rules, after checksum #################
blocked = False blocked = False
if len(watch['trigger_text']): trigger_text = watch.get('trigger_text', [])
if len(trigger_text):
# Assume blocked # Assume blocked
blocked = True blocked = True
# Filter and trigger works the same, so reuse it # Filter and trigger works the same, so reuse it
# It should return the line numbers that match # It should return the line numbers that match
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
wordlist=watch['trigger_text'], wordlist=trigger_text,
mode="line numbers") mode="line numbers")
# Unblock if the trigger was found # Unblock if the trigger was found
if result: if result:
blocked = False blocked = False
text_should_not_be_present = watch.get('text_should_not_be_present', [])
if len(watch['text_should_not_be_present']): if len(text_should_not_be_present):
# If anything matched, then we should block a change from happening # If anything matched, then we should block a change from happening
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
wordlist=watch['text_should_not_be_present'], wordlist=text_should_not_be_present,
mode="line numbers") mode="line numbers")
if result: if result:
blocked = True blocked = True
# The main thing that all this at the moment comes down to :) # The main thing that all this at the moment comes down to :)
if watch['previous_md5'] != fetched_md5: if watch.get('previous_md5') != fetched_md5:
changed_detected = True changed_detected = True
# Looks like something changed, but did it match all the rules? # Looks like something changed, but did it match all the rules?
@ -287,7 +287,7 @@ class perform_site_check():
# Extract title as title # Extract title as title
if is_html: if is_html:
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']: if self.datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']): if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content) update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)

Loading…
Cancel
Save