From 9bc71d187e3fa21075943c014189912f73c219b0 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Sat, 12 Feb 2022 17:21:25 +0100 Subject: [PATCH] Split out content type methods --- changedetectionio/fetch_site_status.py | 93 +++++++++++++++----------- 1 file changed, 53 insertions(+), 40 deletions(-) diff --git a/changedetectionio/fetch_site_status.py b/changedetectionio/fetch_site_status.py index d75c0c6e..df4af2d5 100644 --- a/changedetectionio/fetch_site_status.py +++ b/changedetectionio/fetch_site_status.py @@ -55,10 +55,13 @@ class perform_site_check(): changed_detected = False stripped_text_from_html = "" + fetched_md5 = "" + + text_content_before_ignored_filter = False watch = self.datastore.data['watching'][uuid] - # Unset any existing notification error + # Unset any existing notification error update_obj = {'last_notification_error': False, 'last_error': False} extra_headers = self.datastore.get_val(uuid, 'headers') @@ -92,6 +95,7 @@ class perform_site_check(): fetcher = klass() fetcher.run(url, timeout, request_headers, request_body, request_method) + # Fetching complete, now filters # @todo move to class / maybe inside of fetcher abstract base? @@ -101,9 +105,11 @@ class perform_site_check(): # - Do we convert to JSON? # https://stackoverflow.com/questions/41817578/basic-method-chaining ? # return content().textfilter().jsonextract().checksumcompare() ? + update_obj['content-type'] = fetcher.headers.get('Content-Type', '').lower().strip() + is_json = update_obj['content-type'] == 'application/json' + is_text_or_html = 'text' in update_obj['content-type'] + is_binary = 'image' in update_obj['content-type'] - is_json = fetcher.headers.get('Content-Type', '') == 'application/json' - is_html = not is_json css_filter_rule = watch['css_filter'] has_filter_rule = css_filter_rule and len(css_filter_rule.strip()) @@ -116,7 +122,7 @@ class perform_site_check(): stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule) is_html = False - if is_html: + if is_text_or_html: # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text html_content = fetcher.content if not fetcher.headers.get('Content-Type', '') == 'text/plain': @@ -135,8 +141,8 @@ class perform_site_check(): # Don't run get_text or xpath/css filters on plaintext stripped_text_from_html = html_content - # Re #340 - return the content before the 'ignore text' was applied - text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') + # Re #340 - return the content before the 'ignore text' was applied + text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') # We rely on the actual text in the html output.. many sites have random script vars etc, # in the future we'll implement other mechanisms. @@ -145,17 +151,32 @@ class perform_site_check(): # If there's text to skip # @todo we could abstract out the get_text() to handle this cleaner - text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', []) - if len(text_to_ignore): - stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, text_to_ignore) - else: - stripped_text_from_html = stripped_text_from_html.encode('utf8') - # Re #133 - if we should strip whitespaces from triggering the change detected comparison - if self.datastore.data['settings']['application'].get('ignore_whitespace', False): - fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest() - else: - fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest() + if is_text_or_html: + text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', []) + if len(text_to_ignore): + stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, text_to_ignore) + else: + stripped_text_from_html = stripped_text_from_html.encode('utf8') + + + if is_text_or_html: + # Re #133 - if we should strip whitespaces from triggering the change detected comparison + if self.datastore.data['settings']['application'].get('ignore_whitespace', False): + fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest() + else: + fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest() + + # Extract title as title + if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']: + if not watch['title'] or not len(watch['title']): + update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content) + + # Goal here in the future is to be able to abstract out different content type checks into their own class + + if is_binary: + fetched_md5 = hashlib.md5(fetcher.content) + text_content_before_ignored_filter = fetcher.content # On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one. if not len(watch['previous_md5']): @@ -163,37 +184,29 @@ class perform_site_check(): update_obj["previous_md5"] = fetched_md5 blocked_by_not_found_trigger_text = False - - if len(watch['trigger_text']): - blocked_by_not_found_trigger_text = True - for line in watch['trigger_text']: - # Because JSON wont serialize a re.compile object - if line[0] == '/' and line[-1] == '/': - regex = re.compile(line.strip('/'), re.IGNORECASE) - # Found it? so we don't wait for it anymore - r = re.search(regex, str(stripped_text_from_html)) - if r: + if is_text_or_html: + if len(watch['trigger_text']): + blocked_by_not_found_trigger_text = True + for line in watch['trigger_text']: + # Because JSON wont serialize a re.compile object + if line[0] == '/' and line[-1] == '/': + regex = re.compile(line.strip('/'), re.IGNORECASE) + # Found it? so we don't wait for it anymore + r = re.search(regex, str(stripped_text_from_html)) + if r: + blocked_by_not_found_trigger_text = False + break + + elif line.lower() in str(stripped_text_from_html).lower(): + # We found it don't wait for it. blocked_by_not_found_trigger_text = False break - elif line.lower() in str(stripped_text_from_html).lower(): - # We found it don't wait for it. - blocked_by_not_found_trigger_text = False - break - - - if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5: changed_detected = True update_obj["previous_md5"] = fetched_md5 update_obj["last_changed"] = timestamp - # Extract title as title - if is_html: - if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']: - if not watch['title'] or not len(watch['title']): - update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content) - - + # text_content_before_ignored_filter is returned for saving the data to disk return changed_detected, update_obj, text_content_before_ignored_filter