HTTP Fetcher code improvements

pull/540/head
dgtlmoon 3 years ago committed by GitHub
parent 4074fe53f1
commit 34a87c0f41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -46,139 +46,137 @@ class perform_site_check():
if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']: if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']:
request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '') request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '')
# @todo check the failures are really handled how we expect timeout = self.datastore.data['settings']['requests']['timeout']
url = self.datastore.get_val(uuid, 'url')
request_body = self.datastore.get_val(uuid, 'body')
request_method = self.datastore.get_val(uuid, 'method')
ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes')
# Pluggable content fetcher
prefer_backend = watch['fetch_backend']
if hasattr(content_fetcher, prefer_backend):
klass = getattr(content_fetcher, prefer_backend)
else: else:
timeout = self.datastore.data['settings']['requests']['timeout'] # If the klass doesnt exist, just use a default
url = self.datastore.get_val(uuid, 'url') klass = getattr(content_fetcher, "html_requests")
request_body = self.datastore.get_val(uuid, 'body')
request_method = self.datastore.get_val(uuid, 'method')
ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes') fetcher = klass()
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code)
# Pluggable content fetcher # Fetching complete, now filters
prefer_backend = watch['fetch_backend'] # @todo move to class / maybe inside of fetcher abstract base?
if hasattr(content_fetcher, prefer_backend):
klass = getattr(content_fetcher, prefer_backend) # @note: I feel like the following should be in a more obvious chain system
else: # - Check filter text
# If the klass doesnt exist, just use a default # - Is the checksum different?
klass = getattr(content_fetcher, "html_requests") # - Do we convert to JSON?
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?
fetcher = klass()
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code) is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
# Fetching complete, now filters is_html = not is_json
# @todo move to class / maybe inside of fetcher abstract base? css_filter_rule = watch['css_filter']
subtractive_selectors = watch.get(
# @note: I feel like the following should be in a more obvious chain system "subtractive_selectors", []
# - Check filter text ) + self.datastore.data["settings"]["application"].get(
# - Is the checksum different? "global_subtractive_selectors", []
# - Do we convert to JSON? )
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ? has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip())
is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
is_html = not is_json if is_json and not has_filter_rule:
css_filter_rule = watch['css_filter'] css_filter_rule = "json:$"
subtractive_selectors = watch.get( has_filter_rule = True
"subtractive_selectors", []
) + self.datastore.data["settings"]["application"].get( if has_filter_rule:
"global_subtractive_selectors", [] if 'json:' in css_filter_rule:
) stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False
has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip()) if is_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
if is_json and not has_filter_rule: html_content = fetcher.content
css_filter_rule = "json:$"
has_filter_rule = True # If not JSON, and if it's not text/plain..
if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
if has_filter_rule: # Don't run get_text or xpath/css filters on plaintext
if 'json:' in css_filter_rule: stripped_text_from_html = html_content
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False
if is_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = fetcher.content
# If not JSON, and if it's not text/plain..
if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
# Don't run get_text or xpath/css filters on plaintext
stripped_text_from_html = html_content
else:
# Then we assume HTML
if has_filter_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
if css_filter_rule[0] == '/':
html_content = html_tools.xpath_filter(xpath_filter=css_filter_rule, html_content=fetcher.content)
else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
if has_subtractive_selectors:
html_content = html_tools.element_removal(subtractive_selectors, html_content)
# extract text
stripped_text_from_html = \
html_tools.html_to_text(
html_content,
render_anchor_tag_content=self.datastore.data["settings"][
"application"].get(
"render_anchor_tag_content", False)
)
# Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
update_obj["last_check_status"] = fetcher.get_last_status_code()
# If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
if len(text_to_ignore):
stripped_text_from_html = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
else: else:
stripped_text_from_html = stripped_text_from_html.encode('utf8') # Then we assume HTML
if has_filter_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
if css_filter_rule[0] == '/':
html_content = html_tools.xpath_filter(xpath_filter=css_filter_rule, html_content=fetcher.content)
else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
if has_subtractive_selectors:
html_content = html_tools.element_removal(subtractive_selectors, html_content)
# extract text
stripped_text_from_html = \
html_tools.html_to_text(
html_content,
render_anchor_tag_content=self.datastore.data["settings"][
"application"].get(
"render_anchor_tag_content", False)
)
# Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
update_obj["last_check_status"] = fetcher.get_last_status_code()
# If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
if len(text_to_ignore):
stripped_text_from_html = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
# Re #133 - if we should strip whitespaces from triggering the change detected comparison # Re #133 - if we should strip whitespaces from triggering the change detected comparison
if self.datastore.data['settings']['application'].get('ignore_whitespace', False): if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest() fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
else: else:
fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest() fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
# On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one. # On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
if not len(watch['previous_md5']): if not len(watch['previous_md5']):
watch['previous_md5'] = fetched_md5 watch['previous_md5'] = fetched_md5
update_obj["previous_md5"] = fetched_md5 update_obj["previous_md5"] = fetched_md5
blocked_by_not_found_trigger_text = False blocked_by_not_found_trigger_text = False
if len(watch['trigger_text']): if len(watch['trigger_text']):
# Yeah, lets block first until something matches # Yeah, lets block first until something matches
blocked_by_not_found_trigger_text = True blocked_by_not_found_trigger_text = True
# Filter and trigger works the same, so reuse it # Filter and trigger works the same, so reuse it
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
wordlist=watch['trigger_text'], wordlist=watch['trigger_text'],
mode="line numbers") mode="line numbers")
if result: if result:
blocked_by_not_found_trigger_text = False blocked_by_not_found_trigger_text = False
if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5: if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
changed_detected = True changed_detected = True
update_obj["previous_md5"] = fetched_md5 update_obj["previous_md5"] = fetched_md5
update_obj["last_changed"] = timestamp update_obj["last_changed"] = timestamp
# Extract title as title # Extract title as title
if is_html: if is_html:
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']: if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
if not watch['title'] or not len(watch['title']): if not watch['title'] or not len(watch['title']):
update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content) update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
if self.datastore.data['settings']['application'].get('real_browser_save_screenshot', True): if self.datastore.data['settings']['application'].get('real_browser_save_screenshot', True):
screenshot = fetcher.screenshot() screenshot = fetcher.screenshot()
fetcher.quit() fetcher.quit()
return changed_detected, update_obj, text_content_before_ignored_filter, screenshot return changed_detected, update_obj, text_content_before_ignored_filter, screenshot

Loading…
Cancel
Save