Adding support for change detection of HTML source-code via "source:https://website.com" prefix (#540)

pull/549/head
dgtlmoon 3 years ago committed by GitHub
parent d7ed7c44ed
commit 380c512cc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -39,11 +39,12 @@ Free, Open-source web page monitoring, notification and change detection. Don't
- COVID related news from government websites - COVID related news from government websites
- University/organisation news from their website - University/organisation news from their website
- Detect and monitor changes in JSON API responses - Detect and monitor changes in JSON API responses
- API monitoring and alerting - JSON API monitoring and alerting
- Changes in legal and other documents - Changes in legal and other documents
- Trigger API calls via notifications when text appears on a website - Trigger API calls via notifications when text appears on a website
- Glue together APIs using the JSON filter and JSON notifications - Glue together APIs using the JSON filter and JSON notifications
- Create RSS feeds based on changes in web content - Create RSS feeds based on changes in web content
- Monitor HTML source code for unexpected changes, strengthen your PCI compliance
- You have a very sensitive list of URLs to watch and you do _not_ want to use the paid alternatives. (Remember, _you_ are the product) - You have a very sensitive list of URLs to watch and you do _not_ want to use the paid alternatives. (Remember, _you_ are the product)
_Need an actual Chrome runner with Javascript support? We support fetching via WebDriver!</a>_ _Need an actual Chrome runner with Javascript support? We support fetching via WebDriver!</a>_

@ -708,7 +708,7 @@ def changedetection_app(config=None, datastore_o=None):
url = url.strip() url = url.strip()
url, *tags = url.split(" ") url, *tags = url.split(" ")
# Flask wtform validators wont work with basic auth, use validators package # Flask wtform validators wont work with basic auth, use validators package
if len(url) and validators.url(url): if len(url) and validators.url(url.replace('source:', '')):
new_uuid = datastore.add_watch(url=url.strip(), tag=" ".join(tags)) new_uuid = datastore.add_watch(url=url.strip(), tag=" ".join(tags))
# Straight into the queue. # Straight into the queue.
update_q.put(new_uuid) update_q.put(new_uuid)

@ -52,6 +52,12 @@ class perform_site_check():
request_method = self.datastore.get_val(uuid, 'method') request_method = self.datastore.get_val(uuid, 'method')
ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes') ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes')
# source: support
is_source = False
if url.startswith('source:'):
url = url.replace('source:', '')
is_source = True
# Pluggable content fetcher # Pluggable content fetcher
prefer_backend = watch['fetch_backend'] prefer_backend = watch['fetch_backend']
if hasattr(content_fetcher, prefer_backend): if hasattr(content_fetcher, prefer_backend):
@ -60,7 +66,6 @@ class perform_site_check():
# If the klass doesnt exist, just use a default # If the klass doesnt exist, just use a default
klass = getattr(content_fetcher, "html_requests") klass = getattr(content_fetcher, "html_requests")
fetcher = klass() fetcher = klass()
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code) fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code)
# Fetching complete, now filters # Fetching complete, now filters
@ -75,6 +80,12 @@ class perform_site_check():
is_json = 'application/json' in fetcher.headers.get('Content-Type', '') is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
is_html = not is_json is_html = not is_json
# source: support, basically treat it as plaintext
if is_source:
is_html = False
is_json = False
css_filter_rule = watch['css_filter'] css_filter_rule = watch['css_filter']
subtractive_selectors = watch.get( subtractive_selectors = watch.get(
"subtractive_selectors", [] "subtractive_selectors", []
@ -94,7 +105,7 @@ class perform_site_check():
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule) stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
is_html = False is_html = False
if is_html: if is_html or is_source:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content = fetcher.content html_content = fetcher.content
@ -113,6 +124,8 @@ class perform_site_check():
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content) html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
if has_subtractive_selectors: if has_subtractive_selectors:
html_content = html_tools.element_removal(subtractive_selectors, html_content) html_content = html_tools.element_removal(subtractive_selectors, html_content)
if not is_source:
# extract text # extract text
stripped_text_from_html = \ stripped_text_from_html = \
html_tools.html_to_text( html_tools.html_to_text(
@ -122,6 +135,13 @@ class perform_site_check():
"render_anchor_tag_content", False) "render_anchor_tag_content", False)
) )
elif is_source:
stripped_text_from_html = html_content
# Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
@ -161,13 +181,11 @@ class perform_site_check():
if result: if result:
blocked_by_not_found_trigger_text = False blocked_by_not_found_trigger_text = False
if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5: if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
changed_detected = True changed_detected = True
update_obj["previous_md5"] = fetched_md5 update_obj["previous_md5"] = fetched_md5
update_obj["last_changed"] = timestamp update_obj["last_changed"] = timestamp
# Extract title as title # Extract title as title
if is_html: if is_html:
if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']: if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:

@ -51,7 +51,7 @@
<td class="inline paused-state state-{{watch.paused}}"><a href="{{url_for('index', pause=watch.uuid, tag=active_tag)}}"><img src="{{url_for('static_content', group='images', filename='pause.svg')}}" alt="Pause" title="Pause"/></a></td> <td class="inline paused-state state-{{watch.paused}}"><a href="{{url_for('index', pause=watch.uuid, tag=active_tag)}}"><img src="{{url_for('static_content', group='images', filename='pause.svg')}}" alt="Pause" title="Pause"/></a></td>
<td class="title-col inline">{{watch.title if watch.title is not none and watch.title|length > 0 else watch.url}} <td class="title-col inline">{{watch.title if watch.title is not none and watch.title|length > 0 else watch.url}}
<a class="external" target="_blank" rel="noopener" href="{{ watch.url }}"></a> <a class="external" target="_blank" rel="noopener" href="{{ watch.url.replace('source:','') }}"></a>
{%if watch.fetch_backend == "html_webdriver" %}<img style="height: 1em; display:inline-block;" src="{{url_for('static_content', group='images', filename='Google-Chrome-icon.png')}}" />{% endif %} {%if watch.fetch_backend == "html_webdriver" %}<img style="height: 1em; display:inline-block;" src="{{url_for('static_content', group='images', filename='Google-Chrome-icon.png')}}" />{% endif %}
{% if watch.last_error is defined and watch.last_error != False %} {% if watch.last_error is defined and watch.last_error != False %}

@ -50,6 +50,14 @@ def test_check_basic_change_detection_functionality(client, live_server):
##################### #####################
# Check HTML conversion detected and workd
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Check this class does not appear (that we didnt see the actual source)
assert b'foobar-detection' not in res.data
# Make a change # Make a change
set_modified_response() set_modified_response()

@ -0,0 +1,95 @@
#!/usr/bin/python3
import time
from flask import url_for
from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup
sleep_time_for_fetch_thread = 3
def test_setup(live_server):
live_server_setup(live_server)
def test_check_basic_change_detection_functionality_source(client, live_server):
set_original_response()
test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(sleep_time_for_fetch_thread)
#####################
# Check HTML conversion detected and workd
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Check this class DOES appear (that we didnt see the actual source)
assert b'foobar-detection' in res.data
# Make a change
set_modified_response()
# Force recheck
res = client.get(url_for("api_watch_checknow"), follow_redirects=True)
assert b'1 watches are queued for rechecking.' in res.data
time.sleep(5)
# Now something should be ready, indicated by having a 'unviewed' class
res = client.get(url_for("index"))
assert b'unviewed' in res.data
res = client.get(
url_for("diff_history_page", uuid="first"),
follow_redirects=True
)
assert b'&lt;title&gt;modified head title' in res.data
def test_check_ignore_elements(client, live_server):
set_original_response()
time.sleep(2)
test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(sleep_time_for_fetch_thread)
#####################
# We want <span> and <p> ONLY, but ignore span with .foobar-detection
res = client.post(
url_for("edit_page", uuid="first"),
data={"css_filter": 'span,p', "url": test_url, "tag": "", "subtractive_selectors": ".foobar-detection", 'fetch_backend': "html_requests"},
follow_redirects=True
)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'foobar-detection' not in res.data
assert b'&lt;br' not in res.data
assert b'&lt;p' in res.data

@ -10,6 +10,7 @@ def set_original_response():
<p>Which is across multiple lines</p> <p>Which is across multiple lines</p>
</br> </br>
So let's see what happens. </br> So let's see what happens. </br>
<span class="foobar-detection" style='display:none'></span>
</body> </body>
</html> </html>
""" """

Loading…
Cancel
Save