From b043d477dcd55cdfe3d26fccaa8e0dcfb5bd53b3 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Tue, 8 Nov 2022 12:18:38 +0100 Subject: [PATCH] Use deepcopy to stop possible data corruption (#1108) --- changedetectionio/fetch_site_status.py | 56 +++++++++++++------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/changedetectionio/fetch_site_status.py b/changedetectionio/fetch_site_status.py index b291a466..31c0bb7f 100644 --- a/changedetectionio/fetch_site_status.py +++ b/changedetectionio/fetch_site_status.py @@ -15,7 +15,6 @@ class FilterNotFoundInResponse(ValueError): ValueError.__init__(self, msg) - # Some common stuff here that can be moved to a base class # (set_proxy_from_list) class perform_site_check(): @@ -39,18 +38,20 @@ class perform_site_check(): return regex - def run(self, uuid): + from copy import deepcopy changed_detected = False screenshot = False # as bytes stripped_text_from_html = "" - watch = self.datastore.data['watching'].get(uuid) + # DeepCopy so we can be sure we don't accidently change anything by reference + watch = deepcopy(self.datastore.data['watching'].get(uuid)) + if not watch: return # Protect against file:// access - if re.search(r'^file', watch['url'], re.IGNORECASE) and not os.getenv('ALLOW_FILE_URI', False): + if re.search(r'^file', watch.get('url', ''), re.IGNORECASE) and not os.getenv('ALLOW_FILE_URI', False): raise Exception( "file:// type access is denied for security reasons." ) @@ -58,10 +59,10 @@ class perform_site_check(): # Unset any existing notification error update_obj = {'last_notification_error': False, 'last_error': False} - extra_headers =self.datastore.data['watching'][uuid].get('headers') + extra_headers = watch.get('headers', []) # Tweak the base config with the per-watch ones - request_headers = self.datastore.data['settings']['headers'].copy() + request_headers = deepcopy(self.datastore.data['settings']['headers']) request_headers.update(extra_headers) # https://github.com/psf/requests/issues/4525 @@ -85,7 +86,7 @@ class perform_site_check(): is_source = True # Pluggable content fetcher - prefer_backend = watch['fetch_backend'] + prefer_backend = watch.get('fetch_backend') if hasattr(content_fetcher, prefer_backend): klass = getattr(content_fetcher, prefer_backend) else: @@ -96,21 +97,21 @@ class perform_site_check(): proxy_url = None if proxy_id: proxy_url = self.datastore.proxy_list.get(proxy_id).get('url') - print ("UUID {} Using proxy {}".format(uuid, proxy_url)) + print("UUID {} Using proxy {}".format(uuid, proxy_url)) fetcher = klass(proxy_override=proxy_url) # Configurable per-watch or global extra delay before extracting text (for webDriver types) system_webdriver_delay = self.datastore.data['settings']['application'].get('webdriver_delay', None) if watch['webdriver_delay'] is not None: - fetcher.render_extract_delay = watch['webdriver_delay'] + fetcher.render_extract_delay = watch.get('webdriver_delay') elif system_webdriver_delay is not None: fetcher.render_extract_delay = system_webdriver_delay - if watch['webdriver_js_execute_code'] is not None and watch['webdriver_js_execute_code'].strip(): - fetcher.webdriver_js_execute_code = watch['webdriver_js_execute_code'] + if watch.get('webdriver_js_execute_code') is not None and watch.get('webdriver_js_execute_code').strip(): + fetcher.webdriver_js_execute_code = watch.get('webdriver_js_execute_code') - fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch['include_filters']) + fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch.get('include_filters')) fetcher.quit() self.screenshot = fetcher.screenshot @@ -135,7 +136,7 @@ class perform_site_check(): is_json = False include_filters_rule = watch.get('include_filters', []) - #include_filters_rule = watch['include_filters'] + # include_filters_rule = watch['include_filters'] subtractive_selectors = watch.get( "subtractive_selectors", [] ) + self.datastore.data["settings"]["application"].get( @@ -157,7 +158,7 @@ class perform_site_check(): is_html = False if is_html or is_source: - + # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text fetcher.content = html_tools.workarounds_for_obfuscations(fetcher.content) html_content = fetcher.content @@ -179,8 +180,8 @@ class perform_site_check(): else: # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text html_content += html_tools.include_filters(include_filters=filter_rule, - html_content=fetcher.content, - append_pretty_line_formatting=not is_source) + html_content=fetcher.content, + append_pretty_line_formatting=not is_source) if not html_content.strip(): raise FilterNotFoundInResponse(include_filters_rule) @@ -192,12 +193,11 @@ class perform_site_check(): stripped_text_from_html = html_content else: # extract text + do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False) stripped_text_from_html = \ html_tools.html_to_text( html_content, - render_anchor_tag_content=self.datastore.data["settings"][ - "application"].get( - "render_anchor_tag_content", False) + render_anchor_tag_content=do_anchor ) # Re #340 - return the content before the 'ignore text' was applied @@ -232,7 +232,7 @@ class perform_site_check(): for l in result: if type(l) is tuple: - #@todo - some formatter option default (between groups) + # @todo - some formatter option default (between groups) regex_matched_output += list(l) + [b'\n'] else: # @todo - some formatter option default (between each ungrouped result) @@ -246,7 +246,6 @@ class perform_site_check(): stripped_text_from_html = b''.join(regex_matched_output) text_content_before_ignored_filter = stripped_text_from_html - # Re #133 - if we should strip whitespaces from triggering the change detected comparison if self.datastore.data['settings']['application'].get('ignore_whitespace', False): fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest() @@ -256,29 +255,30 @@ class perform_site_check(): ############ Blocking rules, after checksum ################# blocked = False - if len(watch['trigger_text']): + trigger_text = watch.get('trigger_text', []) + if len(trigger_text): # Assume blocked blocked = True # Filter and trigger works the same, so reuse it # It should return the line numbers that match result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), - wordlist=watch['trigger_text'], + wordlist=trigger_text, mode="line numbers") # Unblock if the trigger was found if result: blocked = False - - if len(watch['text_should_not_be_present']): + text_should_not_be_present = watch.get('text_should_not_be_present', []) + if len(text_should_not_be_present): # If anything matched, then we should block a change from happening result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), - wordlist=watch['text_should_not_be_present'], + wordlist=text_should_not_be_present, mode="line numbers") if result: blocked = True # The main thing that all this at the moment comes down to :) - if watch['previous_md5'] != fetched_md5: + if watch.get('previous_md5') != fetched_md5: changed_detected = True # Looks like something changed, but did it match all the rules? @@ -287,7 +287,7 @@ class perform_site_check(): # Extract title as title if is_html: - if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']: + if self.datastore.data['settings']['application'].get('extract_title_as_title') or watch['extract_title_as_title']: if not watch['title'] or not len(watch['title']): update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)