More works and tests

image-binary-support
dgtlmoon 3 years ago
parent 9bc71d187e
commit 499c4797da

@ -5,8 +5,9 @@ from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy as SeleniumProxy
from selenium.common.exceptions import WebDriverException
import urllib3.exceptions
# image/jpeg etc
supported_binary_types = ['image']
class EmptyReply(Exception):
def __init__(self, status_code, url):
@ -51,6 +52,15 @@ class Fetcher():
# def return_diff(self, stream_a, stream_b):
# return
# Assume we dont support it as binary if its not in our list
def supported_binary_type(content_type):
# Not a binary thing we support? then use text (also used for JSON/XML etc)
# @todo - future - use regex for matching
if content_type and content_type.lower().strip().split('/')[0] not in (string.lower() for string in supported_binary_types):
return False
return True
def available_fetchers():
import inspect
from changedetectionio import content_fetcher
@ -156,15 +166,18 @@ class html_requests(Fetcher):
verify=False)
# https://stackoverflow.com/questions/44203397/python-requests-get-returns-improperly-decoded-text-instead-of-utf-8
# Return bytes here
html = r.text
if not supported_binary_type(r.headers.get('Content-Type', '')):
content = r.text
else:
content = r.content
# @todo test this
# @todo maybe you really want to test zero-byte return pages?
if not r or not html or not len(html):
if not r or not content or not len(content):
raise EmptyReply(url=url, status_code=r.status_code)
self.status_code = r.status_code
self.content = html
self.content = content
self.headers = r.headers

@ -57,7 +57,7 @@ class perform_site_check():
stripped_text_from_html = ""
fetched_md5 = ""
text_content_before_ignored_filter = False
original_content_before_filters = False
watch = self.datastore.data['watching'][uuid]
@ -106,13 +106,16 @@ class perform_site_check():
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?
update_obj['content-type'] = fetcher.headers.get('Content-Type', '').lower().strip()
is_json = update_obj['content-type'] == 'application/json'
is_text_or_html = 'text' in update_obj['content-type']
is_binary = 'image' in update_obj['content-type']
is_binary = content_fetcher.supported_binary_type(update_obj['content-type'])
css_filter_rule = watch['css_filter']
has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
# Make it reformat the JSON to something nice
if is_json and not has_filter_rule:
css_filter_rule = "json:$"
has_filter_rule = True
@ -120,7 +123,7 @@ class perform_site_check():
if has_filter_rule:
if 'json:' in css_filter_rule:
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False
is_text_or_html = False
if is_text_or_html:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
@ -142,7 +145,7 @@ class perform_site_check():
stripped_text_from_html = html_content
# Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
original_content_before_filters = stripped_text_from_html.encode('utf-8')
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
@ -159,8 +162,6 @@ class perform_site_check():
else:
stripped_text_from_html = stripped_text_from_html.encode('utf8')
if is_text_or_html:
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
@ -175,8 +176,11 @@ class perform_site_check():
# Goal here in the future is to be able to abstract out different content type checks into their own class
if is_binary:
fetched_md5 = hashlib.md5(fetcher.content)
text_content_before_ignored_filter = fetcher.content
# @todo - use some actual image hash here where possible, audio hash, etc etc
m = hashlib.sha256()
m.update(fetcher.content)
fetched_md5 = m.hexdigest()
original_content_before_filters = fetcher.content
# On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
if not len(watch['previous_md5']):
@ -208,5 +212,5 @@ class perform_site_check():
update_obj["last_changed"] = timestamp
# text_content_before_ignored_filter is returned for saving the data to disk
return changed_detected, update_obj, text_content_before_ignored_filter
# original_content_before_filters is returned for saving the data to disk
return changed_detected, update_obj, original_content_before_filters

@ -0,0 +1,56 @@
#!/usr/bin/python3
import time
import secrets
from flask import url_for
from . util import live_server_setup
def test_binary_file_change(client, live_server):
with open("test-datastore/test.bin", "wb") as f:
f.write(secrets.token_bytes())
live_server_setup(live_server)
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_binaryfile_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-binary-endpoint' in res.data
# Make a change
with open("test-datastore/test.bin", "wb") as f:
f.write(secrets.token_bytes())
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' in res.data

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.3 KiB

@ -37,6 +37,16 @@ def set_modified_response():
def live_server_setup(live_server):
@live_server.app.route('/test-binary-endpoint')
def test_binaryfile_endpoint():
from flask import make_response
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/test.bin", "rb") as f:
resp = make_response(f.read())
resp.headers['Content-Type'] = 'image/jpeg'
return resp
@live_server.app.route('/test-endpoint')
def test_endpoint():

Loading…
Cancel
Save