Filters can now accept a list/multiple filters (#1064) #623

add-check-counter
dgtlmoon 2 years ago committed by GitHub
parent d0efeb9770
commit 359fc48fb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -599,7 +599,7 @@ def changedetection_app(config=None, datastore_o=None):
extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid) extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid)
# Reset the previous_md5 so we process a new snapshot including stripping ignore text. # Reset the previous_md5 so we process a new snapshot including stripping ignore text.
if form.css_filter.data.strip() != datastore.data['watching'][uuid]['css_filter']: if form.include_filters.data != datastore.data['watching'][uuid].get('include_filters', []):
if len(datastore.data['watching'][uuid].history): if len(datastore.data['watching'][uuid].history):
extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid) extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid)

@ -164,16 +164,16 @@ class Fetcher():
} }
// inject the current one set in the css_filter, which may be a CSS rule // inject the current one set in the include_filters, which may be a CSS rule
// used for displaying the current one in VisualSelector, where its not one we generated. // used for displaying the current one in VisualSelector, where its not one we generated.
if (css_filter.length) { if (include_filters.length) {
q=false; q=false;
try { try {
// is it xpath? // is it xpath?
if (css_filter.startsWith('/') || css_filter.startsWith('xpath:')) { if (include_filters.startsWith('/') || include_filters.startsWith('xpath:')) {
q=document.evaluate(css_filter.replace('xpath:',''), document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue; q=document.evaluate(include_filters.replace('xpath:',''), document, null, XPathResult.FIRST_ORDERED_NODE_TYPE, null).singleNodeValue;
} else { } else {
q=document.querySelector(css_filter); q=document.querySelector(include_filters);
} }
} catch (e) { } catch (e) {
// Maybe catch DOMException and alert? // Maybe catch DOMException and alert?
@ -186,7 +186,7 @@ class Fetcher():
if (bbox && bbox['width'] >0 && bbox['height']>0) { if (bbox && bbox['width'] >0 && bbox['height']>0) {
size_pos.push({ size_pos.push({
xpath: css_filter, xpath: include_filters,
width: bbox['width'], width: bbox['width'],
height: bbox['height'], height: bbox['height'],
left: bbox['left'], left: bbox['left'],
@ -220,7 +220,7 @@ class Fetcher():
request_body, request_body,
request_method, request_method,
ignore_status_codes=False, ignore_status_codes=False,
current_css_filter=None): current_include_filters=None):
# Should set self.error, self.status_code and self.content # Should set self.error, self.status_code and self.content
pass pass
@ -310,7 +310,7 @@ class base_html_playwright(Fetcher):
request_body, request_body,
request_method, request_method,
ignore_status_codes=False, ignore_status_codes=False,
current_css_filter=None): current_include_filters=None):
from playwright.sync_api import sync_playwright from playwright.sync_api import sync_playwright
import playwright._impl._api_types import playwright._impl._api_types
@ -413,10 +413,10 @@ class base_html_playwright(Fetcher):
self.status_code = response.status self.status_code = response.status
self.headers = response.all_headers() self.headers = response.all_headers()
if current_css_filter is not None: if current_include_filters is not None:
page.evaluate("var css_filter={}".format(json.dumps(current_css_filter))) page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
else: else:
page.evaluate("var css_filter=''") page.evaluate("var include_filters=''")
self.xpath_data = page.evaluate("async () => {" + self.xpath_element_js + "}") self.xpath_data = page.evaluate("async () => {" + self.xpath_element_js + "}")
@ -497,7 +497,7 @@ class base_html_webdriver(Fetcher):
request_body, request_body,
request_method, request_method,
ignore_status_codes=False, ignore_status_codes=False,
current_css_filter=None): current_include_filters=None):
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
@ -573,7 +573,7 @@ class html_requests(Fetcher):
request_body, request_body,
request_method, request_method,
ignore_status_codes=False, ignore_status_codes=False,
current_css_filter=None): current_include_filters=None):
# Make requests use a more modern looking user-agent # Make requests use a more modern looking user-agent
if not 'User-Agent' in request_headers: if not 'User-Agent' in request_headers:

@ -10,6 +10,12 @@ from changedetectionio import content_fetcher, html_tools
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class FilterNotFoundInResponse(ValueError):
def __init__(self, msg):
ValueError.__init__(self, msg)
# Some common stuff here that can be moved to a base class # Some common stuff here that can be moved to a base class
# (set_proxy_from_list) # (set_proxy_from_list)
class perform_site_check(): class perform_site_check():
@ -104,7 +110,7 @@ class perform_site_check():
if watch['webdriver_js_execute_code'] is not None and watch['webdriver_js_execute_code'].strip(): if watch['webdriver_js_execute_code'] is not None and watch['webdriver_js_execute_code'].strip():
fetcher.webdriver_js_execute_code = watch['webdriver_js_execute_code'] fetcher.webdriver_js_execute_code = watch['webdriver_js_execute_code']
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch['css_filter']) fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch['include_filters'])
fetcher.quit() fetcher.quit()
self.screenshot = fetcher.screenshot self.screenshot = fetcher.screenshot
@ -128,25 +134,26 @@ class perform_site_check():
is_html = False is_html = False
is_json = False is_json = False
css_filter_rule = watch['css_filter'] include_filters_rule = watch['include_filters']
subtractive_selectors = watch.get( subtractive_selectors = watch.get(
"subtractive_selectors", [] "subtractive_selectors", []
) + self.datastore.data["settings"]["application"].get( ) + self.datastore.data["settings"]["application"].get(
"global_subtractive_selectors", [] "global_subtractive_selectors", []
) )
has_filter_rule = css_filter_rule and len(css_filter_rule.strip()) has_filter_rule = include_filters_rule and len("".join(include_filters_rule).strip())
has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip()) has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip())
if is_json and not has_filter_rule: if is_json and not has_filter_rule:
css_filter_rule = "json:$" include_filters_rule.append("json:$")
has_filter_rule = True has_filter_rule = True
if has_filter_rule: if has_filter_rule:
json_filter_prefixes = ['json:', 'jq:'] json_filter_prefixes = ['json:', 'jq:']
if any(prefix in css_filter_rule for prefix in json_filter_prefixes): for filter in include_filters_rule:
stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, json_filter=css_filter_rule) if any(prefix in filter for prefix in json_filter_prefixes):
is_html = False stripped_text_from_html += html_tools.extract_json_as_string(content=fetcher.content, json_filter=filter)
is_html = False
if is_html or is_source: if is_html or is_source:
@ -161,18 +168,28 @@ class perform_site_check():
else: else:
# Then we assume HTML # Then we assume HTML
if has_filter_rule: if has_filter_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.." html_content = ""
if css_filter_rule[0] == '/' or css_filter_rule.startswith('xpath:'): for filter_rule in include_filters_rule:
html_content = html_tools.xpath_filter(xpath_filter=css_filter_rule.replace('xpath:', ''), # For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
html_content=fetcher.content) if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
else: html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''),
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text html_content=fetcher.content,
html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content) append_pretty_line_formatting=not is_source)
else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content += html_tools.include_filters(include_filters=filter_rule,
html_content=fetcher.content,
append_pretty_line_formatting=not is_source)
if not html_content.strip():
raise FilterNotFoundInResponse(include_filters_rule)
if has_subtractive_selectors: if has_subtractive_selectors:
html_content = html_tools.element_removal(subtractive_selectors, html_content) html_content = html_tools.element_removal(subtractive_selectors, html_content)
if not is_source: if is_source:
stripped_text_from_html = html_content
else:
# extract text # extract text
stripped_text_from_html = \ stripped_text_from_html = \
html_tools.html_to_text( html_tools.html_to_text(
@ -182,9 +199,6 @@ class perform_site_check():
"render_anchor_tag_content", False) "render_anchor_tag_content", False)
) )
elif is_source:
stripped_text_from_html = html_content
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')

@ -349,7 +349,7 @@ class watchForm(commonSettingsForm):
time_between_check = FormField(TimeBetweenCheckForm) time_between_check = FormField(TimeBetweenCheckForm)
css_filter = StringField('CSS/JSON/XPATH Filter', [ValidateCSSJSONXPATHInput()], default='') include_filters = StringListField('CSS/JSONPath/JQ/XPath Filters', [ValidateCSSJSONXPATHInput()], default='')
subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_xpath=False, allow_json=False)]) subtractive_selectors = StringListField('Remove elements', [ValidateCSSJSONXPATHInput(allow_xpath=False, allow_json=False)])

@ -7,26 +7,30 @@ from typing import List
import json import json
import re import re
class FilterNotFoundInResponse(ValueError): # HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis
def __init__(self, msg): TEXT_FILTER_LIST_LINE_SUFFIX = "<br/>"
ValueError.__init__(self, msg)
class JSONNotFound(ValueError): class JSONNotFound(ValueError):
def __init__(self, msg): def __init__(self, msg):
ValueError.__init__(self, msg) ValueError.__init__(self, msg)
# Given a CSS Rule, and a blob of HTML, return the blob of HTML that matches # Given a CSS Rule, and a blob of HTML, return the blob of HTML that matches
def css_filter(css_filter, html_content): def include_filters(include_filters, html_content, append_pretty_line_formatting=False):
soup = BeautifulSoup(html_content, "html.parser") soup = BeautifulSoup(html_content, "html.parser")
html_block = "" html_block = ""
r = soup.select(css_filter, separator="") r = soup.select(include_filters, separator="")
if len(html_content) > 0 and len(r) == 0:
raise FilterNotFoundInResponse(css_filter) for element in r:
for item in r: # When there's more than 1 match, then add the suffix to separate each line
html_block += str(item) # And where the matched result doesn't include something that will cause Inscriptis to add a newline
# (This way each 'match' reliably has a new-line in the diff)
# Divs are converted to 4 whitespaces by inscriptis
if append_pretty_line_formatting and len(html_block) and not element.name in (['br', 'hr', 'div', 'p']):
html_block += TEXT_FILTER_LIST_LINE_SUFFIX
return html_block + "\n" html_block += str(element)
return html_block
def subtractive_css_selector(css_selector, html_content): def subtractive_css_selector(css_selector, html_content):
soup = BeautifulSoup(html_content, "html.parser") soup = BeautifulSoup(html_content, "html.parser")
@ -42,25 +46,29 @@ def element_removal(selectors: List[str], html_content):
# Return str Utf-8 of matched rules # Return str Utf-8 of matched rules
def xpath_filter(xpath_filter, html_content): def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False):
from lxml import etree, html from lxml import etree, html
tree = html.fromstring(bytes(html_content, encoding='utf-8')) tree = html.fromstring(bytes(html_content, encoding='utf-8'))
html_block = "" html_block = ""
r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}) r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'})
if len(html_content) > 0 and len(r) == 0:
raise FilterNotFoundInResponse(xpath_filter)
#@note: //title/text() wont work where <title>CDATA.. #@note: //title/text() wont work where <title>CDATA..
for element in r: for element in r:
# When there's more than 1 match, then add the suffix to separate each line
# And where the matched result doesn't include something that will cause Inscriptis to add a newline
# (This way each 'match' reliably has a new-line in the diff)
# Divs are converted to 4 whitespaces by inscriptis
if append_pretty_line_formatting and len(html_block) and (not hasattr( element, 'tag' ) or not element.tag in (['br', 'hr', 'div', 'p'])):
html_block += TEXT_FILTER_LIST_LINE_SUFFIX
if type(element) == etree._ElementStringResult: if type(element) == etree._ElementStringResult:
html_block += str(element) + "<br/>" html_block += str(element)
elif type(element) == etree._ElementUnicodeResult: elif type(element) == etree._ElementUnicodeResult:
html_block += str(element) + "<br/>" html_block += str(element)
else: else:
html_block += etree.tostring(element, pretty_print=True).decode('utf-8') + "<br/>" html_block += etree.tostring(element, pretty_print=True).decode('utf-8')
return html_block return html_block

@ -103,12 +103,12 @@ class import_distill_io_json(Importer):
pass pass
except IndexError: except IndexError:
pass pass
extras['include_filters'] = []
try: try:
extras['css_filter'] = d_config['selections'][0]['frames'][0]['includes'][0]['expr']
if d_config['selections'][0]['frames'][0]['includes'][0]['type'] == 'xpath': if d_config['selections'][0]['frames'][0]['includes'][0]['type'] == 'xpath':
extras['css_filter'] = 'xpath:' + extras['css_filter'] extras['include_filters'].append('xpath:' + d_config['selections'][0]['frames'][0]['includes'][0]['expr'])
else:
extras['include_filters'].append(d_config['selections'][0]['frames'][0]['includes'][0]['expr'])
except KeyError: except KeyError:
pass pass
except IndexError: except IndexError:

@ -36,7 +36,7 @@ class model(dict):
'notification_body': None, 'notification_body': None,
'notification_format': default_notification_format_for_watch, 'notification_format': default_notification_format_for_watch,
'notification_muted': False, 'notification_muted': False,
'css_filter': '', 'include_filters': [],
'last_error': False, 'last_error': False,
'extract_text': [], # Extract text by regex after filters 'extract_text': [], # Extract text by regex after filters
'subtractive_selectors': [], 'subtractive_selectors': [],

@ -50,7 +50,7 @@ $(document).ready(function() {
state_clicked=false; state_clicked=false;
ctx.clearRect(0, 0, c.width, c.height); ctx.clearRect(0, 0, c.width, c.height);
xctx.clearRect(0, 0, c.width, c.height); xctx.clearRect(0, 0, c.width, c.height);
$("#css_filter").val(''); $("#include_filters").val('');
}); });
@ -68,7 +68,7 @@ $(document).ready(function() {
xctx = c.getContext("2d"); xctx = c.getContext("2d");
// redline highlight context // redline highlight context
ctx = c.getContext("2d"); ctx = c.getContext("2d");
current_default_xpath =$("#css_filter").val(); current_default_xpath =$("#include_filters").val();
fetch_data(); fetch_data();
$('#selector-canvas').off("mousemove mousedown"); $('#selector-canvas').off("mousemove mousedown");
// screenshot_url defined in the edit.html template // screenshot_url defined in the edit.html template
@ -205,9 +205,9 @@ $(document).ready(function() {
var sel = selector_data['size_pos'][current_selected_i]; var sel = selector_data['size_pos'][current_selected_i];
if (sel[0] == '/') { if (sel[0] == '/') {
// @todo - not sure just checking / is right // @todo - not sure just checking / is right
$("#css_filter").val('xpath:'+sel.xpath); $("#include_filters").val('xpath:'+sel.xpath);
} else { } else {
$("#css_filter").val(sel.xpath); $("#include_filters").val(sel.xpath);
} }
xctx.fillStyle = 'rgba(205,205,205,0.95)'; xctx.fillStyle = 'rgba(205,205,205,0.95)';
xctx.strokeStyle = 'rgba(225,0,0,0.9)'; xctx.strokeStyle = 'rgba(225,0,0,0.9)';

@ -82,8 +82,13 @@ class ChangeDetectionStore:
except (FileNotFoundError, json.decoder.JSONDecodeError): except (FileNotFoundError, json.decoder.JSONDecodeError):
if include_default_watches: if include_default_watches:
print("Creating JSON store at", self.datastore_path) print("Creating JSON store at", self.datastore_path)
self.add_watch(url='https://news.ycombinator.com/', tag='Tech news') self.add_watch(url='https://news.ycombinator.com/',
self.add_watch(url='https://changedetection.io/CHANGELOG.txt', tag='changedetection.io') tag='Tech news',
extras={'fetch_backend': 'html_requests'})
self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
tag='changedetection.io',
extras={'fetch_backend': 'html_requests'})
self.__data['version_tag'] = version_tag self.__data['version_tag'] = version_tag
@ -267,7 +272,7 @@ class ChangeDetectionStore:
extras = {} extras = {}
# should always be str # should always be str
if tag is None or not tag: if tag is None or not tag:
tag='' tag = ''
# Incase these are copied across, assume it's a reference and deepcopy() # Incase these are copied across, assume it's a reference and deepcopy()
apply_extras = deepcopy(extras) apply_extras = deepcopy(extras)
@ -282,17 +287,31 @@ class ChangeDetectionStore:
res = r.json() res = r.json()
# List of permissible attributes we accept from the wild internet # List of permissible attributes we accept from the wild internet
for k in ['url', 'tag', for k in [
'paused', 'title', 'body',
'previous_md5', 'headers', 'css_filter',
'body', 'method', 'extract_text',
'ignore_text', 'css_filter', 'extract_title_as_title',
'subtractive_selectors', 'trigger_text', 'headers',
'extract_title_as_title', 'extract_text', 'ignore_text',
'text_should_not_be_present', 'include_filters',
'webdriver_js_execute_code']: 'method',
'paused',
'previous_md5',
'subtractive_selectors',
'tag',
'text_should_not_be_present',
'title',
'trigger_text',
'webdriver_js_execute_code',
'url',
]:
if res.get(k): if res.get(k):
apply_extras[k] = res[k] if k != 'css_filter':
apply_extras[k] = res[k]
else:
# We renamed the field and made it a list
apply_extras['include_filters'] = [res['css_filter']]
except Exception as e: except Exception as e:
logging.error("Error fetching metadata for shared watch link", url, str(e)) logging.error("Error fetching metadata for shared watch link", url, str(e))
@ -315,12 +334,13 @@ class ChangeDetectionStore:
del apply_extras[k] del apply_extras[k]
new_watch.update(apply_extras) new_watch.update(apply_extras)
self.__data['watching'][new_uuid]=new_watch self.__data['watching'][new_uuid] = new_watch
self.__data['watching'][new_uuid].ensure_data_dir_exists() self.__data['watching'][new_uuid].ensure_data_dir_exists()
if write_to_disk_now: if write_to_disk_now:
self.sync_to_json() self.sync_to_json()
return new_uuid return new_uuid
def visualselector_data_is_ready(self, watch_uuid): def visualselector_data_is_ready(self, watch_uuid):
@ -584,3 +604,14 @@ class ChangeDetectionStore:
for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']: for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']:
if self.data['settings']['headers'].get(v): if self.data['settings']['headers'].get(v):
del self.data['settings']['headers'][v] del self.data['settings']['headers'][v]
# Convert filters to a list of filters css_filter -> include_filters
def update_8(self):
for uuid, watch in self.data['watching'].items():
try:
existing_filter = watch.get('css_filter', '')
if existing_filter:
watch['include_filters'] = [existing_filter]
except:
continue
return

@ -174,15 +174,17 @@ User-Agent: wonderbra 1.0") }}
</div> </div>
</fieldset> </fieldset>
<div class="pure-control-group"> <div class="pure-control-group">
{% set field = render_field(form.css_filter, {% set field = render_field(form.include_filters,
placeholder=".class-name or #some-id, or other CSS selector rule.", rows=5,
placeholder="#example
xpath://body/div/span[contains(@class, 'example-class')]",
class="m-d") class="m-d")
%} %}
{{ field }} {{ field }}
{% if '/text()' in field %} {% if '/text()' in field %}
<span class="pure-form-message-inline"><strong>Note!: //text() function does not work where the &lt;element&gt; contains &lt;![CDATA[]]&gt;</strong></span><br/> <span class="pure-form-message-inline"><strong>Note!: //text() function does not work where the &lt;element&gt; contains &lt;![CDATA[]]&gt;</strong></span><br/>
{% endif %} {% endif %}
<span class="pure-form-message-inline"> <span class="pure-form-message-inline">One rule per line, <i>any</i> rules that matches will be used.<br/>
<ul> <ul>
<li>CSS - Limit text to this CSS rule, only text matching this CSS rule is included.</li> <li>CSS - Limit text to this CSS rule, only text matching this CSS rule is included.</li>
<li>JSON - Limit text to this JSON rule, using either <a href="https://pypi.org/project/jsonpath-ng/" target="new">JSONPath</a> or <a href="https://stedolan.github.io/jq/" target="new">jq</a> (if installed). <li>JSON - Limit text to this JSON rule, using either <a href="https://pypi.org/project/jsonpath-ng/" target="new">JSONPath</a> or <a href="https://stedolan.github.io/jq/" target="new">jq</a> (if installed).

@ -24,7 +24,7 @@ def test_preferred_proxy(client, live_server):
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={ data={
"css_filter": "", "include_filters": "",
"fetch_backend": "html_requests", "fetch_backend": "html_requests",
"headers": "", "headers": "",
"proxy": "proxy-two", "proxy": "proxy-two",

@ -23,7 +23,7 @@ def test_basic_auth(client, live_server):
# Check form validation # Check form validation
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": "", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": "", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data

@ -46,22 +46,23 @@ def set_modified_response():
# Test that the CSS extraction works how we expect, important here is the right placing of new lines \n's # Test that the CSS extraction works how we expect, important here is the right placing of new lines \n's
def test_css_filter_output(): def test_include_filters_output():
from changedetectionio import fetch_site_status
from inscriptis import get_text from inscriptis import get_text
# Check text with sub-parts renders correctly # Check text with sub-parts renders correctly
content = """<html> <body><div id="thingthing" > Some really <b>bold</b> text </div> </body> </html>""" content = """<html> <body><div id="thingthing" > Some really <b>bold</b> text </div> </body> </html>"""
html_blob = css_filter(css_filter="#thingthing", html_content=content) html_blob = include_filters(include_filters="#thingthing", html_content=content)
text = get_text(html_blob) text = get_text(html_blob)
assert text == " Some really bold text" assert text == " Some really bold text"
content = """<html> <body> content = """<html> <body>
<p>foo bar blah</p> <p>foo bar blah</p>
<div class="parts">Block A</div> <div class="parts">Block B</div></body> <DIV class="parts">Block A</DiV> <div class="parts">Block B</DIV></body>
</html> </html>
""" """
html_blob = css_filter(css_filter=".parts", html_content=content)
# in xPath this would be //*[@class='parts']
html_blob = include_filters(include_filters=".parts", html_content=content)
text = get_text(html_blob) text = get_text(html_blob)
# Divs are converted to 4 whitespaces by inscriptis # Divs are converted to 4 whitespaces by inscriptis
@ -69,10 +70,10 @@ def test_css_filter_output():
# Tests the whole stack works with the CSS Filter # Tests the whole stack works with the CSS Filter
def test_check_markup_css_filter_restriction(client, live_server): def test_check_markup_include_filters_restriction(client, live_server):
sleep_time_for_fetch_thread = 3 sleep_time_for_fetch_thread = 3
css_filter = "#sametext" include_filters = "#sametext"
set_original_response() set_original_response()
@ -98,7 +99,7 @@ def test_check_markup_css_filter_restriction(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": css_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": include_filters, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -107,7 +108,7 @@ def test_check_markup_css_filter_restriction(client, live_server):
res = client.get( res = client.get(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
) )
assert bytes(css_filter.encode('utf-8')) in res.data assert bytes(include_filters.encode('utf-8')) in res.data
# Trigger a check # Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
@ -126,3 +127,58 @@ def test_check_markup_css_filter_restriction(client, live_server):
# Because it should be looking at only that 'sametext' id # Because it should be looking at only that 'sametext' id
res = client.get(url_for("index")) res = client.get(url_for("index"))
assert b'unviewed' in res.data assert b'unviewed' in res.data
# Tests the whole stack works with the CSS Filter
def test_check_multiple_filters(client, live_server):
sleep_time_for_fetch_thread = 3
include_filters = "#blob-a\r\nxpath://*[contains(@id,'blob-b')]"
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("""<html><body>
<div id="blob-a">Blob A</div>
<div id="blob-b">Blob B</div>
<div id="blob-c">Blob C</div>
</body>
</html>
""")
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
time.sleep(1)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"include_filters": include_filters,
"url": test_url,
"tag": "",
"headers": "",
'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Only the two blobs should be here
assert b"Blob A" in res.data # CSS was ok
assert b"Blob B" in res.data # xPath was ok
assert b"Blob C" not in res.data # Should not be included

@ -88,7 +88,7 @@ def test_check_filter_multiline(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": '', data={"include_filters": '',
'extract_text': '/something.+?6 billion.+?lines/si', 'extract_text': '/something.+?6 billion.+?lines/si',
"url": test_url, "url": test_url,
"tag": "", "tag": "",
@ -116,7 +116,7 @@ def test_check_filter_multiline(client, live_server):
def test_check_filter_and_regex_extract(client, live_server): def test_check_filter_and_regex_extract(client, live_server):
sleep_time_for_fetch_thread = 3 sleep_time_for_fetch_thread = 3
css_filter = ".changetext" include_filters = ".changetext"
set_original_response() set_original_response()
@ -143,7 +143,7 @@ def test_check_filter_and_regex_extract(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": css_filter, data={"include_filters": include_filters,
'extract_text': '\d+ online\r\n\d+ guests\r\n/somecase insensitive \d+/i\r\n/somecase insensitive (345\d)/i', 'extract_text': '\d+ online\r\n\d+ guests\r\n/somecase insensitive \d+/i\r\n/somecase insensitive (345\d)/i',
"url": test_url, "url": test_url,
"tag": "", "tag": "",

@ -92,7 +92,7 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
"tag": "my tag", "tag": "my tag",
"title": "my title", "title": "my title",
"headers": "", "headers": "",
"css_filter": '.ticket-available', "include_filters": '.ticket-available',
"fetch_backend": "html_requests"}) "fetch_backend": "html_requests"})
res = client.post( res = client.post(

@ -76,7 +76,7 @@ def run_filter_test(client, content_filter):
"title": "my title", "title": "my title",
"headers": "", "headers": "",
"filter_failure_notification_send": 'y', "filter_failure_notification_send": 'y',
"css_filter": content_filter, "include_filters": content_filter,
"fetch_backend": "html_requests"}) "fetch_backend": "html_requests"})
res = client.post( res = client.post(
@ -95,7 +95,7 @@ def run_filter_test(client, content_filter):
time.sleep(3) time.sleep(3)
# We should see something in the frontend # We should see something in the frontend
assert b'Warning, filter' in res.data assert b'Warning, no filters were found' in res.data
# Now it should exist and contain our "filter not found" alert # Now it should exist and contain our "filter not found" alert
assert os.path.isfile("test-datastore/notification.txt") assert os.path.isfile("test-datastore/notification.txt")
@ -131,7 +131,7 @@ def run_filter_test(client, content_filter):
def test_setup(live_server): def test_setup(live_server):
live_server_setup(live_server) live_server_setup(live_server)
def test_check_css_filter_failure_notification(client, live_server): def test_check_include_filters_failure_notification(client, live_server):
set_original_response() set_original_response()
time.sleep(1) time.sleep(1)
run_filter_test(client, '#nope-doesnt-exist') run_filter_test(client, '#nope-doesnt-exist')

@ -132,7 +132,7 @@ def set_original_response():
return None return None
def set_response_with_html(): def set_json_response_with_html():
test_return_data = """ test_return_data = """
{ {
"test": [ "test": [
@ -176,7 +176,7 @@ def set_modified_response():
def test_check_json_without_filter(client, live_server): def test_check_json_without_filter(client, live_server):
# Request a JSON document from a application/json source containing HTML # Request a JSON document from a application/json source containing HTML
# and be sure it doesn't get chewed up by instriptis # and be sure it doesn't get chewed up by instriptis
set_response_with_html() set_json_response_with_html()
# Give the endpoint time to spin up # Give the endpoint time to spin up
time.sleep(1) time.sleep(1)
@ -189,9 +189,6 @@ def test_check_json_without_filter(client, live_server):
follow_redirects=True follow_redirects=True
) )
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(3) time.sleep(3)
@ -200,6 +197,7 @@ def test_check_json_without_filter(client, live_server):
follow_redirects=True follow_redirects=True
) )
# Should still see '"html": "<b>"'
assert b'&#34;&lt;b&gt;' in res.data assert b'&#34;&lt;b&gt;' in res.data
assert res.data.count(b'{\n') >= 2 assert res.data.count(b'{\n') >= 2
@ -221,9 +219,6 @@ def check_json_filter(json_filter, client, live_server):
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(3) time.sleep(3)
@ -231,7 +226,7 @@ def check_json_filter(json_filter, client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": json_filter, data={"include_filters": json_filter,
"url": test_url, "url": test_url,
"tag": "", "tag": "",
"headers": "", "headers": "",
@ -247,9 +242,6 @@ def check_json_filter(json_filter, client, live_server):
) )
assert bytes(escape(json_filter).encode('utf-8')) in res.data assert bytes(escape(json_filter).encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(3) time.sleep(3)
# Make a change # Make a change
@ -301,7 +293,7 @@ def check_json_filter_bool_val(json_filter, client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": json_filter, data={"include_filters": json_filter,
"url": test_url, "url": test_url,
"tag": "", "tag": "",
"headers": "", "headers": "",
@ -311,11 +303,6 @@ def check_json_filter_bool_val(json_filter, client, live_server):
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
time.sleep(3)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(3) time.sleep(3)
# Make a change # Make a change
@ -360,9 +347,6 @@ def check_json_ext_filter(json_filter, client, live_server):
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(3) time.sleep(3)
@ -370,7 +354,7 @@ def check_json_ext_filter(json_filter, client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": json_filter, data={"include_filters": json_filter,
"url": test_url, "url": test_url,
"tag": "", "tag": "",
"headers": "", "headers": "",
@ -386,9 +370,6 @@ def check_json_ext_filter(json_filter, client, live_server):
) )
assert bytes(escape(json_filter).encode('utf-8')) in res.data assert bytes(escape(json_filter).encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(3) time.sleep(3)
# Make a change # Make a change

@ -14,7 +14,7 @@ def test_share_watch(client, live_server):
live_server_setup(live_server) live_server_setup(live_server)
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
css_filter = ".nice-filter" include_filters = ".nice-filter"
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
@ -29,7 +29,7 @@ def test_share_watch(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": css_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": include_filters, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -37,7 +37,7 @@ def test_share_watch(client, live_server):
res = client.get( res = client.get(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
) )
assert bytes(css_filter.encode('utf-8')) in res.data assert bytes(include_filters.encode('utf-8')) in res.data
# click share the link # click share the link
res = client.get( res = client.get(
@ -73,4 +73,8 @@ def test_share_watch(client, live_server):
res = client.get( res = client.get(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
) )
assert bytes(css_filter.encode('utf-8')) in res.data assert bytes(include_filters.encode('utf-8')) in res.data
# Check it saved the URL
res = client.get(url_for("index"))
assert bytes(test_url.encode('utf-8')) in res.data

@ -57,10 +57,9 @@ def test_check_basic_change_detection_functionality_source(client, live_server):
# `subtractive_selectors` should still work in `source:` type requests
def test_check_ignore_elements(client, live_server): def test_check_ignore_elements(client, live_server):
set_original_response() set_original_response()
time.sleep(2) time.sleep(2)
test_url = 'source:'+url_for('test_endpoint', _external=True) test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page # Add our URL to the import page
@ -77,9 +76,9 @@ def test_check_ignore_elements(client, live_server):
##################### #####################
# We want <span> and <p> ONLY, but ignore span with .foobar-detection # We want <span> and <p> ONLY, but ignore span with .foobar-detection
res = client.post( client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": 'span,p', "url": test_url, "tag": "", "subtractive_selectors": ".foobar-detection", 'fetch_backend': "html_requests"}, data={"include_filters": 'span,p', "url": test_url, "tag": "", "subtractive_selectors": ".foobar-detection", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
@ -89,7 +88,6 @@ def test_check_ignore_elements(client, live_server):
url_for("preview_page", uuid="first"), url_for("preview_page", uuid="first"),
follow_redirects=True follow_redirects=True
) )
assert b'foobar-detection' not in res.data assert b'foobar-detection' not in res.data
assert b'&lt;br' not in res.data assert b'&lt;br' not in res.data
assert b'&lt;p' in res.data assert b'&lt;p' in res.data

@ -49,7 +49,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server):
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"trigger_text": "/cool.stuff/", data={"trigger_text": "/cool.stuff/",
"url": test_url, "url": test_url,
"css_filter": '#in-here', "include_filters": '#in-here',
"fetch_backend": "html_requests"}, "fetch_backend": "html_requests"},
follow_redirects=True follow_redirects=True
) )

@ -22,7 +22,7 @@ def test_check_watch_field_storage(client, live_server):
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={ "notification_urls": "json://127.0.0.1:30000\r\njson://128.0.0.1\r\n", data={ "notification_urls": "json://127.0.0.1:30000\r\njson://128.0.0.1\r\n",
"time_between_check-minutes": 126, "time_between_check-minutes": 126,
"css_filter" : ".fooclass", "include_filters" : ".fooclass",
"title" : "My title", "title" : "My title",
"ignore_text" : "ignore this", "ignore_text" : "ignore this",
"url": test_url, "url": test_url,

@ -89,7 +89,7 @@ def test_check_xpath_filter_utf8(client, live_server):
time.sleep(1) time.sleep(1)
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -143,7 +143,7 @@ def test_check_xpath_text_function_utf8(client, live_server):
time.sleep(1) time.sleep(1)
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -182,9 +182,6 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread) time.sleep(sleep_time_for_fetch_thread)
@ -192,7 +189,7 @@ def test_check_markup_xpath_filter_restriction(client, live_server):
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": xpath_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": xpath_filter, "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
@ -230,10 +227,11 @@ def test_xpath_validation(client, live_server):
follow_redirects=True follow_redirects=True
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
time.sleep(2)
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": "/something horrible", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": "/something horrible", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )
assert b"is not a valid XPath expression" in res.data assert b"is not a valid XPath expression" in res.data
@ -242,7 +240,7 @@ def test_xpath_validation(client, live_server):
# actually only really used by the distll.io importer, but could be handy too # actually only really used by the distll.io importer, but could be handy too
def test_check_with_prefix_css_filter(client, live_server): def test_check_with_prefix_include_filters(client, live_server):
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True) res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data assert b'Deleted' in res.data
@ -263,7 +261,7 @@ def test_check_with_prefix_css_filter(client, live_server):
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"css_filter": "xpath://*[contains(@class, 'sametext')]", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"}, data={"include_filters": "xpath://*[contains(@class, 'sametext')]", "url": test_url, "tag": "", "headers": "", 'fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
) )

@ -4,7 +4,7 @@ import queue
import time import time
from changedetectionio import content_fetcher from changedetectionio import content_fetcher
from changedetectionio.html_tools import FilterNotFoundInResponse from changedetectionio.fetch_site_status import FilterNotFoundInResponse
# A single update worker # A single update worker
# #
@ -91,8 +91,8 @@ class update_worker(threading.Thread):
return return
n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
'notification_body': "Your configured CSS/xPath filter of '{}' for {{watch_url}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{base_url}}/edit/{{watch_uuid}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format( 'notification_body': "Your configured CSS/xPath filters of '{}' for {{watch_url}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{base_url}}/edit/{{watch_uuid}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format(
watch['css_filter'], ", ".join(watch['include_filters']),
threshold), threshold),
'notification_format': 'text'} 'notification_format': 'text'}
@ -189,7 +189,7 @@ class update_worker(threading.Thread):
if not self.datastore.data['watching'].get(uuid): if not self.datastore.data['watching'].get(uuid):
continue continue
err_text = "Warning, filter '{}' not found".format(str(e)) err_text = "Warning, no filters were found, no change detection ran."
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
# So that we get a trigger when the content is added again # So that we get a trigger when the content is added again
'previous_md5': ''}) 'previous_md5': ''})

Loading…
Cancel
Save