Split out content type methods

image-binary-support
dgtlmoon 3 years ago
parent 536948c8c6
commit 9bc71d187e

@ -55,10 +55,13 @@ class perform_site_check():
changed_detected = False changed_detected = False
stripped_text_from_html = "" stripped_text_from_html = ""
fetched_md5 = ""
text_content_before_ignored_filter = False
watch = self.datastore.data['watching'][uuid] watch = self.datastore.data['watching'][uuid]
# Unset any existing notification error
# Unset any existing notification error
update_obj = {'last_notification_error': False, 'last_error': False} update_obj = {'last_notification_error': False, 'last_error': False}
extra_headers = self.datastore.get_val(uuid, 'headers') extra_headers = self.datastore.get_val(uuid, 'headers')
@ -92,6 +95,7 @@ class perform_site_check():
fetcher = klass() fetcher = klass()
fetcher.run(url, timeout, request_headers, request_body, request_method) fetcher.run(url, timeout, request_headers, request_body, request_method)
# Fetching complete, now filters # Fetching complete, now filters
# @todo move to class / maybe inside of fetcher abstract base? # @todo move to class / maybe inside of fetcher abstract base?
@ -101,9 +105,11 @@ class perform_site_check():
# - Do we convert to JSON? # - Do we convert to JSON?
# https://stackoverflow.com/questions/41817578/basic-method-chaining ? # https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ? # return content().textfilter().jsonextract().checksumcompare() ?
update_obj['content-type'] = fetcher.headers.get('Content-Type', '').lower().strip()
is_json = update_obj['content-type'] == 'application/json'
is_text_or_html = 'text' in update_obj['content-type']
is_binary = 'image' in update_obj['content-type']
is_json = fetcher.headers.get('Content-Type', '') == 'application/json'
is_html = not is_json
css_filter_rule = watch['css_filter'] css_filter_rule = watch['css_filter']
has_filter_rule = css_filter_rule and len(css_filter_rule.strip()) has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
@ -116,7 +122,7 @@ class perform_site_check():
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule) stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False is_html = False
if is_html: if is_text_or_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = fetcher.content html_content = fetcher.content
if not fetcher.headers.get('Content-Type', '') == 'text/plain': if not fetcher.headers.get('Content-Type', '') == 'text/plain':
@ -135,8 +141,8 @@ class perform_site_check():
# Don't run get_text or xpath/css filters on plaintext # Don't run get_text or xpath/css filters on plaintext
stripped_text_from_html = html_content stripped_text_from_html = html_content
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# We rely on the actual text in the html output.. many sites have random script vars etc, # We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms. # in the future we'll implement other mechanisms.
@ -145,17 +151,32 @@ class perform_site_check():
# If there's text to skip # If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner # @todo we could abstract out the get_text() to handle this cleaner
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
if len(text_to_ignore):
stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, text_to_ignore)
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
# Re #133 - if we should strip whitespaces from triggering the change detected comparison if is_text_or_html:
if self.datastore.data['settings']['application'].get('ignore_whitespace', False): text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest() if len(text_to_ignore):
else: stripped_text_from_html = self.strip_ignore_text(stripped_text_from_html, text_to_ignore)
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest() else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
if is_text_or_html:
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
else:
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
# Extract title as title
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
# Goal here in the future is to be able to abstract out different content type checks into their own class
if is_binary:
fetched_md5 = hashlib.md5(fetcher.content)
text_content_before_ignored_filter = fetcher.content
# On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one. # On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
if not len(watch['previous_md5']): if not len(watch['previous_md5']):
@ -163,37 +184,29 @@ class perform_site_check():
update_obj["previous_md5"] = fetched_md5 update_obj["previous_md5"] = fetched_md5
blocked_by_not_found_trigger_text = False blocked_by_not_found_trigger_text = False
if is_text_or_html:
if len(watch['trigger_text']): if len(watch['trigger_text']):
blocked_by_not_found_trigger_text = True blocked_by_not_found_trigger_text = True
for line in watch['trigger_text']: for line in watch['trigger_text']:
# Because JSON wont serialize a re.compile object # Because JSON wont serialize a re.compile object
if line[0] == '/' and line[-1] == '/': if line[0] == '/' and line[-1] == '/':
regex = re.compile(line.strip('/'), re.IGNORECASE) regex = re.compile(line.strip('/'), re.IGNORECASE)
# Found it? so we don't wait for it anymore # Found it? so we don't wait for it anymore
r = re.search(regex, str(stripped_text_from_html)) r = re.search(regex, str(stripped_text_from_html))
if r: if r:
blocked_by_not_found_trigger_text = False
break
elif line.lower() in str(stripped_text_from_html).lower():
# We found it don't wait for it.
blocked_by_not_found_trigger_text = False blocked_by_not_found_trigger_text = False
break break
elif line.lower() in str(stripped_text_from_html).lower():
# We found it don't wait for it.
blocked_by_not_found_trigger_text = False
break
if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5: if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
changed_detected = True changed_detected = True
update_obj["previous_md5"] = fetched_md5 update_obj["previous_md5"] = fetched_md5
update_obj["last_changed"] = timestamp update_obj["last_changed"] = timestamp
# Extract title as title # text_content_before_ignored_filter is returned for saving the data to disk
if is_html:
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
return changed_detected, update_obj, text_content_before_ignored_filter return changed_detected, update_obj, text_content_before_ignored_filter

Loading…
Cancel
Save