Backend - Regular expression / string filtering refactor for Python 3.11 and deprecation warnings since Python 3.6 (#1786)

pull/1781/head^2
dgtlmoon 1 year ago committed by GitHub
parent 34f2d30968
commit 2b948c15c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,6 +10,7 @@ import re
# HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis # HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis
TEXT_FILTER_LIST_LINE_SUFFIX = "<br>" TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
PERL_STYLE_REGEX = r'^/(.*?)/([a-z]*)?$'
# 'price' , 'lowPrice', 'highPrice' are usually under here # 'price' , 'lowPrice', 'highPrice' are usually under here
# all of those may or may not appear on different websites # all of those may or may not appear on different websites
LD_JSON_PRODUCT_OFFER_SELECTOR = "json:$..offers" LD_JSON_PRODUCT_OFFER_SELECTOR = "json:$..offers"
@ -18,6 +19,22 @@ class JSONNotFound(ValueError):
def __init__(self, msg): def __init__(self, msg):
ValueError.__init__(self, msg) ValueError.__init__(self, msg)
# Doesn't look like python supports forward slash auto enclosure in re.findall
# So convert it to inline flag "(?i)foobar" type configuration
def perl_style_slash_enclosed_regex_to_options(regex):
res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE)
if res:
flags = res.group(2) if res.group(2) else 'i'
regex = f"(?{flags}){res.group(1)}"
else:
# Fall back to just ignorecase as an option
regex = f"(?i){regex}"
return regex
# Given a CSS Rule, and a blob of HTML, return the blob of HTML that matches # Given a CSS Rule, and a blob of HTML, return the blob of HTML that matches
def include_filters(include_filters, html_content, append_pretty_line_formatting=False): def include_filters(include_filters, html_content, append_pretty_line_formatting=False):
soup = BeautifulSoup(html_content, "html.parser") soup = BeautifulSoup(html_content, "html.parser")
@ -195,23 +212,14 @@ def strip_ignore_text(content, wordlist, mode="content"):
output = [] output = []
ignore_text = [] ignore_text = []
ignore_regex = [] ignore_regex = []
ignored_line_numbers = [] ignored_line_numbers = []
for k in wordlist: for k in wordlist:
# Is it a regex? # Is it a regex?
x = re.search('^\/(.*)\/(.*)', k.strip()) res = re.search(PERL_STYLE_REGEX, k, re.IGNORECASE)
if x: if res:
# Starts with / but doesn't look like a regex ignore_regex.append(re.compile(perl_style_slash_enclosed_regex_to_options(k)))
p = x.group(1)
try:
# @Todo python regex options can go before the regex str, but not really many of the options apply on a per-line basis
ignore_regex.append(re.compile(rf"{p}", re.IGNORECASE))
except Exception as e:
# Badly formed regex, treat as text
ignore_text.append(k.strip())
else: else:
# Had a / but doesn't work as regex
ignore_text.append(k.strip()) ignore_text.append(k.strip())
for line in content.splitlines(): for line in content.splitlines():

@ -11,17 +11,19 @@ from changedetectionio import content_fetcher, html_tools
from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT
from copy import deepcopy from copy import deepcopy
from . import difference_detection_processor from . import difference_detection_processor
from ..html_tools import PERL_STYLE_REGEX
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
name = 'Webpage Text/HTML, JSON and PDF changes'
name = 'Webpage Text/HTML, JSON and PDF changes'
description = 'Detects all text changes where possible' description = 'Detects all text changes where possible'
class FilterNotFoundInResponse(ValueError): class FilterNotFoundInResponse(ValueError):
def __init__(self, msg): def __init__(self, msg):
ValueError.__init__(self, msg) ValueError.__init__(self, msg)
class PDFToHTMLToolNotFound(ValueError): class PDFToHTMLToolNotFound(ValueError):
def __init__(self, msg): def __init__(self, msg):
ValueError.__init__(self, msg) ValueError.__init__(self, msg)
@ -37,19 +39,6 @@ class perform_site_check(difference_detection_processor):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.datastore = datastore self.datastore = datastore
# Doesn't look like python supports forward slash auto enclosure in re.findall
# So convert it to inline flag "foobar(?i)" type configuration
def forward_slash_enclosed_regex_to_options(self, regex):
res = re.search(r'^/(.*?)/(\w+)$', regex, re.IGNORECASE)
if res:
regex = res.group(1)
regex += '(?{})'.format(res.group(2))
else:
regex += '(?{})'.format('i')
return regex
def run(self, uuid, skip_when_checksum_same=True, preferred_proxy=None): def run(self, uuid, skip_when_checksum_same=True, preferred_proxy=None):
changed_detected = False changed_detected = False
screenshot = False # as bytes screenshot = False # as bytes
@ -135,7 +124,8 @@ class perform_site_check(difference_detection_processor):
# requests for PDF's, images etc should be passwd the is_binary flag # requests for PDF's, images etc should be passwd the is_binary flag
is_binary = watch.is_pdf is_binary = watch.is_pdf
fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch.get('include_filters'), is_binary=is_binary) fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch.get('include_filters'),
is_binary=is_binary)
fetcher.quit() fetcher.quit()
self.screenshot = fetcher.screenshot self.screenshot = fetcher.screenshot
@ -151,7 +141,6 @@ class perform_site_check(difference_detection_processor):
if update_obj['previous_md5_before_filters'] == watch.get('previous_md5_before_filters'): if update_obj['previous_md5_before_filters'] == watch.get('previous_md5_before_filters'):
raise content_fetcher.checksumFromPreviousCheckWasTheSame() raise content_fetcher.checksumFromPreviousCheckWasTheSame()
# Fetching complete, now filters # Fetching complete, now filters
# @todo move to class / maybe inside of fetcher abstract base? # @todo move to class / maybe inside of fetcher abstract base?
@ -231,8 +220,6 @@ class perform_site_check(difference_detection_processor):
stripped_text_from_html += html_tools.extract_json_as_string(content=fetcher.content, json_filter=filter) stripped_text_from_html += html_tools.extract_json_as_string(content=fetcher.content, json_filter=filter)
is_html = False is_html = False
if is_html or is_source: if is_html or is_source:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
@ -283,7 +270,6 @@ class perform_site_check(difference_detection_processor):
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
# @todo whitespace coming from missing rtrim()? # @todo whitespace coming from missing rtrim()?
# stripped_text_from_html could be based on their preferences, replace the processed text with only that which they want to know about. # stripped_text_from_html could be based on their preferences, replace the processed text with only that which they want to know about.
# Rewrite's the processing text based on only what diff result they want to see # Rewrite's the processing text based on only what diff result they want to see
@ -293,13 +279,13 @@ class perform_site_check(difference_detection_processor):
# needs to not include (added) etc or it may get used twice # needs to not include (added) etc or it may get used twice
# Replace the processed text with the preferred result # Replace the processed text with the preferred result
rendered_diff = diff.render_diff(previous_version_file_contents=watch.get_last_fetched_before_filters(), rendered_diff = diff.render_diff(previous_version_file_contents=watch.get_last_fetched_before_filters(),
newest_version_file_contents=stripped_text_from_html, newest_version_file_contents=stripped_text_from_html,
include_equal=False, # not the same lines include_equal=False, # not the same lines
include_added=watch.get('filter_text_added', True), include_added=watch.get('filter_text_added', True),
include_removed=watch.get('filter_text_removed', True), include_removed=watch.get('filter_text_removed', True),
include_replaced=watch.get('filter_text_replaced', True), include_replaced=watch.get('filter_text_replaced', True),
line_feed_sep="\n", line_feed_sep="\n",
include_change_type_prefix=False) include_change_type_prefix=False)
watch.save_last_fetched_before_filters(text_content_before_ignored_filter) watch.save_last_fetched_before_filters(text_content_before_ignored_filter)
@ -340,16 +326,25 @@ class perform_site_check(difference_detection_processor):
regex_matched_output = [] regex_matched_output = []
for s_re in extract_text: for s_re in extract_text:
# incase they specified something in '/.../x' # incase they specified something in '/.../x'
regex = self.forward_slash_enclosed_regex_to_options(s_re) if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE):
result = re.findall(regex.encode('utf-8'), stripped_text_from_html) regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re)
result = re.findall(regex.encode('utf-8'), stripped_text_from_html)
for l in result:
if type(l) is tuple: for l in result:
# @todo - some formatter option default (between groups) if type(l) is tuple:
regex_matched_output += list(l) + [b'\n'] # @todo - some formatter option default (between groups)
else: regex_matched_output += list(l) + [b'\n']
# @todo - some formatter option default (between each ungrouped result) else:
regex_matched_output += [l] + [b'\n'] # @todo - some formatter option default (between each ungrouped result)
regex_matched_output += [l] + [b'\n']
else:
# Doesnt look like regex, just hunt for plaintext and return that which matches
# `stripped_text_from_html` will be bytes, so we must encode s_re also to bytes
r = re.compile(re.escape(s_re.encode('utf-8')), re.IGNORECASE)
res = r.findall(stripped_text_from_html)
if res:
for match in res:
regex_matched_output += [match] + [b'\n']
# Now we will only show what the regex matched # Now we will only show what the regex matched
stripped_text_from_html = b'' stripped_text_from_html = b''

@ -378,15 +378,16 @@ Unavailable") }}
{{ render_field(form.extract_text, rows=5, placeholder="\d+ online") }} {{ render_field(form.extract_text, rows=5, placeholder="\d+ online") }}
<span class="pure-form-message-inline"> <span class="pure-form-message-inline">
<ul> <ul>
<li>Extracts text in the final output (line by line) after other filters using regular expressions; <li>Extracts text in the final output (line by line) after other filters using regular expressions or string match;
<ul> <ul>
<li>Regular expression &dash; example <code>/reports.+?2022/i</code></li> <li>Regular expression &dash; example <code>/reports.+?2022/i</code></li>
<li>Don't forget to consider the white-space at the start of a line <code>/.+?reports.+?2022/i</code></li>
<li>Use <code>//(?aiLmsux))</code> type flags (more <a href="https://docs.python.org/3/library/re.html#index-15">information here</a>)<br></li> <li>Use <code>//(?aiLmsux))</code> type flags (more <a href="https://docs.python.org/3/library/re.html#index-15">information here</a>)<br></li>
<li>Keyword example &dash; example <code>Out of stock</code></li> <li>Keyword example &dash; example <code>Out of stock</code></li>
<li>Use groups to extract just that text &dash; example <code>/reports.+?(\d+)/i</code> returns a list of years only</li> <li>Use groups to extract just that text &dash; example <code>/reports.+?(\d+)/i</code> returns a list of years only</li>
</ul> </ul>
</li> </li>
<li>One line per regular-expression/ string match</li> <li>One line per regular-expression/string match</li>
</ul> </ul>
</span> </span>
</div> </div>

@ -2,7 +2,7 @@
import time import time
from flask import url_for from flask import url_for
from .util import live_server_setup from .util import live_server_setup, wait_for_all_checks
from ..html_tools import * from ..html_tools import *
@ -55,6 +55,8 @@ def set_multiline_response():
</p> </p>
<div>aaand something lines</div> <div>aaand something lines</div>
<br>
<div>and this should be</div>
</body> </body>
</html> </html>
""" """
@ -66,11 +68,10 @@ def set_multiline_response():
def test_setup(client, live_server): def test_setup(client, live_server):
live_server_setup(live_server) live_server_setup(live_server)
def test_check_filter_multiline(client, live_server): def test_check_filter_multiline(client, live_server):
#live_server_setup(live_server)
set_multiline_response() set_multiline_response()
# Add our URL to the import page # Add our URL to the import page
@ -82,14 +83,15 @@ def test_check_filter_multiline(client, live_server):
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
time.sleep(3) wait_for_all_checks(client)
# Goto the edit page, add our ignore text # Goto the edit page, add our ignore text
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"include_filters": '', data={"include_filters": '',
'extract_text': '/something.+?6 billion.+?lines/si', # Test a regex and a plaintext
'extract_text': '/something.+?6 billion.+?lines/si\r\nand this should be',
"url": test_url, "url": test_url,
"tags": "", "tags": "",
"headers": "", "headers": "",
@ -99,13 +101,19 @@ def test_check_filter_multiline(client, live_server):
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
time.sleep(3) wait_for_all_checks(client)
res = client.get(url_for("index"))
# Issue 1828
assert b'not at the start of the expression' not in res.data
res = client.get( res = client.get(
url_for("preview_page", uuid="first"), url_for("preview_page", uuid="first"),
follow_redirects=True follow_redirects=True
) )
# Plaintext that doesnt look like a regex should match also
assert b'and this should be' in res.data
assert b'<div class="">Something' in res.data assert b'<div class="">Something' in res.data
assert b'<div class="">across 6 billion multiple' in res.data assert b'<div class="">across 6 billion multiple' in res.data
@ -115,14 +123,11 @@ def test_check_filter_multiline(client, live_server):
assert b'aaand something lines' not in res.data assert b'aaand something lines' not in res.data
def test_check_filter_and_regex_extract(client, live_server): def test_check_filter_and_regex_extract(client, live_server):
sleep_time_for_fetch_thread = 3
include_filters = ".changetext" include_filters = ".changetext"
set_original_response() set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
res = client.post( res = client.post(
@ -132,19 +137,15 @@ def test_check_filter_and_regex_extract(client, live_server):
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
time.sleep(1)
# Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# Goto the edit page, add our ignore text # Goto the edit page, add our ignore text
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid="first"),
data={"include_filters": include_filters, data={"include_filters": include_filters,
'extract_text': '\d+ online\r\n\d+ guests\r\n/somecase insensitive \d+/i\r\n/somecase insensitive (345\d)/i', 'extract_text': '/\d+ online/\r\n/\d+ guests/\r\n/somecase insensitive \d+/i\r\n/somecase insensitive (345\d)/i\r\n/issue1828.+?2022/i',
"url": test_url, "url": test_url,
"tags": "", "tags": "",
"headers": "", "headers": "",
@ -155,8 +156,13 @@ def test_check_filter_and_regex_extract(client, live_server):
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
res = client.get(url_for("index"))
#issue 1828
assert b'not at the start of the expression' not in res.data
# Make a change # Make a change
set_modified_response() set_modified_response()
@ -164,7 +170,7 @@ def test_check_filter_and_regex_extract(client, live_server):
# Trigger a check # Trigger a check
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# It should have 'unviewed' still # It should have 'unviewed' still
# Because it should be looking at only that 'sametext' id # Because it should be looking at only that 'sametext' id

@ -2,7 +2,7 @@
import time import time
from flask import url_for from flask import url_for
from . util import live_server_setup from .util import live_server_setup, wait_for_all_checks
def set_original_ignore_response(): def set_original_ignore_response():
@ -26,13 +26,8 @@ def test_trigger_regex_functionality(client, live_server):
live_server_setup(live_server) live_server_setup(live_server)
sleep_time_for_fetch_thread = 3
set_original_ignore_response() set_original_ignore_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
res = client.post( res = client.post(
@ -43,7 +38,7 @@ def test_trigger_regex_functionality(client, live_server):
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# It should report nothing found (just a new one shouldnt have anything) # It should report nothing found (just a new one shouldnt have anything)
res = client.get(url_for("index")) res = client.get(url_for("index"))
@ -57,7 +52,7 @@ def test_trigger_regex_functionality(client, live_server):
"fetch_backend": "html_requests"}, "fetch_backend": "html_requests"},
follow_redirects=True follow_redirects=True
) )
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# so that we set the state to 'unviewed' after all the edits # so that we set the state to 'unviewed' after all the edits
client.get(url_for("diff_history_page", uuid="first")) client.get(url_for("diff_history_page", uuid="first"))
@ -65,7 +60,7 @@ def test_trigger_regex_functionality(client, live_server):
f.write("some new noise") f.write("some new noise")
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# It should report nothing found (nothing should match the regex) # It should report nothing found (nothing should match the regex)
res = client.get(url_for("index")) res = client.get(url_for("index"))
@ -75,7 +70,7 @@ def test_trigger_regex_functionality(client, live_server):
f.write("regex test123<br>\nsomething 123") f.write("regex test123<br>\nsomething 123")
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
res = client.get(url_for("index")) res = client.get(url_for("index"))
assert b'unviewed' in res.data assert b'unviewed' in res.data

Loading…
Cancel
Save