Merge branch 'master' into extra-filters

extra-filters
dgtlmoon 3 months ago
commit 515b1bc87f

@ -0,0 +1,78 @@
# include the decorator
from apprise.decorators import notify
@notify(on="delete")
@notify(on="deletes")
@notify(on="get")
@notify(on="gets")
@notify(on="post")
@notify(on="posts")
@notify(on="put")
@notify(on="puts")
def apprise_custom_api_call_wrapper(body, title, notify_type, *args, **kwargs):
import requests
import json
from apprise.utils import parse_url as apprise_parse_url
from apprise import URLBase
url = kwargs['meta'].get('url')
if url.startswith('post'):
r = requests.post
elif url.startswith('get'):
r = requests.get
elif url.startswith('put'):
r = requests.put
elif url.startswith('delete'):
r = requests.delete
url = url.replace('post://', 'http://')
url = url.replace('posts://', 'https://')
url = url.replace('put://', 'http://')
url = url.replace('puts://', 'https://')
url = url.replace('get://', 'http://')
url = url.replace('gets://', 'https://')
url = url.replace('put://', 'http://')
url = url.replace('puts://', 'https://')
url = url.replace('delete://', 'http://')
url = url.replace('deletes://', 'https://')
headers = {}
params = {}
auth = None
# Convert /foobar?+some-header=hello to proper header dictionary
results = apprise_parse_url(url)
if results:
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set and tidy entries by unquoting them
headers = {URLBase.unquote(x): URLBase.unquote(y)
for x, y in results['qsd+'].items()}
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
# In Apprise, it relies on prefixing each request arg with "-", because it uses say &method=update as a flag for apprise
# but here we are making straight requests, so we need todo convert this against apprise's logic
for k, v in results['qsd'].items():
if not k.strip('+-') in results['qsd+'].keys():
params[URLBase.unquote(k)] = URLBase.unquote(v)
# Determine Authentication
auth = ''
if results.get('user') and results.get('password'):
auth = (URLBase.unquote(results.get('user')), URLBase.unquote(results.get('user')))
elif results.get('user'):
auth = (URLBase.unquote(results.get('user')))
# Try to auto-guess if it's JSON
try:
json.loads(body)
headers['Content-Type'] = 'application/json; charset=utf-8'
except ValueError as e:
pass
r(results.get('url'),
auth=auth,
data=body.encode('utf-8') if type(body) is str else body,
headers=headers,
params=params
)

@ -25,6 +25,7 @@ browser_step_ui_config = {'Choose one': '0 0',
'Click element if exists': '1 0', 'Click element if exists': '1 0',
'Click element': '1 0', 'Click element': '1 0',
'Click element containing text': '0 1', 'Click element containing text': '0 1',
'Click element containing text if exists': '0 1',
'Enter text in field': '1 1', 'Enter text in field': '1 1',
'Execute JS': '0 1', 'Execute JS': '0 1',
# 'Extract text and use as filter': '1 0', # 'Extract text and use as filter': '1 0',
@ -96,12 +97,24 @@ class steppable_browser_interface():
return self.action_goto_url(value=self.start_url) return self.action_goto_url(value=self.start_url)
def action_click_element_containing_text(self, selector=None, value=''): def action_click_element_containing_text(self, selector=None, value=''):
logger.debug("Clicking element containing text")
if not len(value.strip()): if not len(value.strip()):
return return
elem = self.page.get_by_text(value) elem = self.page.get_by_text(value)
if elem.count(): if elem.count():
elem.first.click(delay=randint(200, 500), timeout=3000) elem.first.click(delay=randint(200, 500), timeout=3000)
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
logger.debug("Clicking element containing text if exists")
if not len(value.strip()):
return
elem = self.page.get_by_text(value)
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=3000)
else:
return
def action_enter_text_in_field(self, selector, value): def action_enter_text_in_field(self, selector, value):
if not len(selector.strip()): if not len(selector.strip()):
return return

@ -1,8 +1,6 @@
from loguru import logger from loguru import logger
import chardet
import hashlib import hashlib
import os import os
import requests
from changedetectionio import strtobool from changedetectionio import strtobool
from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived
from changedetectionio.content_fetchers.base import Fetcher from changedetectionio.content_fetchers.base import Fetcher
@ -28,6 +26,9 @@ class fetcher(Fetcher):
is_binary=False, is_binary=False,
empty_pages_are_a_change=False): empty_pages_are_a_change=False):
import chardet
import requests
if self.browser_steps_get_valid_steps(): if self.browser_steps_get_valid_steps():
raise BrowserStepsInUnsupportedFetcher(url=url) raise BrowserStepsInUnsupportedFetcher(url=url)

@ -541,7 +541,8 @@ def changedetection_app(config=None, datastore_o=None):
import random import random
from .apprise_asset import asset from .apprise_asset import asset
apobj = apprise.Apprise(asset=asset) apobj = apprise.Apprise(asset=asset)
# so that the custom endpoints are registered
from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper
is_global_settings_form = request.args.get('mode', '') == 'global-settings' is_global_settings_form = request.args.get('mode', '') == 'global-settings'
is_group_settings_form = request.args.get('mode', '') == 'group-settings' is_group_settings_form = request.args.get('mode', '') == 'group-settings'

@ -221,7 +221,8 @@ class ValidateAppRiseServers(object):
def __call__(self, form, field): def __call__(self, form, field):
import apprise import apprise
apobj = apprise.Apprise() apobj = apprise.Apprise()
# so that the custom endpoints are registered
from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper
for server_url in field.data: for server_url in field.data:
if not apobj.add(server_url): if not apobj.add(server_url):
message = field.gettext('\'%s\' is not a valid AppRise URL.' % (server_url)) message = field.gettext('\'%s\' is not a valid AppRise URL.' % (server_url))

@ -1,10 +1,4 @@
from bs4 import BeautifulSoup
from inscriptis import get_text
from jsonpath_ng.ext import parse
from typing import List from typing import List
from inscriptis.model.config import ParserConfig
from xml.sax.saxutils import escape as xml_escape
import json import json
import re import re
@ -39,6 +33,7 @@ def perl_style_slash_enclosed_regex_to_options(regex):
# Given a CSS Rule, and a blob of HTML, return the blob of HTML that matches # Given a CSS Rule, and a blob of HTML, return the blob of HTML that matches
def include_filters(include_filters, html_content, append_pretty_line_formatting=False): def include_filters(include_filters, html_content, append_pretty_line_formatting=False):
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser") soup = BeautifulSoup(html_content, "html.parser")
html_block = "" html_block = ""
r = soup.select(include_filters, separator="") r = soup.select(include_filters, separator="")
@ -56,6 +51,7 @@ def include_filters(include_filters, html_content, append_pretty_line_formatting
return html_block return html_block
def subtractive_css_selector(css_selector, html_content): def subtractive_css_selector(css_selector, html_content):
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser") soup = BeautifulSoup(html_content, "html.parser")
for item in soup.select(css_selector): for item in soup.select(css_selector):
item.decompose() item.decompose()
@ -181,6 +177,7 @@ def xpath1_filter(xpath_filter, html_content, append_pretty_line_formatting=Fals
# Extract/find element # Extract/find element
def extract_element(find='title', html_content=''): def extract_element(find='title', html_content=''):
from bs4 import BeautifulSoup
#Re #106, be sure to handle when its not found #Re #106, be sure to handle when its not found
element_text = None element_text = None
@ -194,6 +191,8 @@ def extract_element(find='title', html_content=''):
# #
def _parse_json(json_data, json_filter): def _parse_json(json_data, json_filter):
from jsonpath_ng.ext import parse
if json_filter.startswith("json:"): if json_filter.startswith("json:"):
jsonpath_expression = parse(json_filter.replace('json:', '')) jsonpath_expression = parse(json_filter.replace('json:', ''))
match = jsonpath_expression.find(json_data) match = jsonpath_expression.find(json_data)
@ -242,6 +241,8 @@ def _get_stripped_text_from_json_match(match):
# json_filter - ie json:$..price # json_filter - ie json:$..price
# ensure_is_ldjson_info_type - str "product", optional, "@type == product" (I dont know how to do that as a json selector) # ensure_is_ldjson_info_type - str "product", optional, "@type == product" (I dont know how to do that as a json selector)
def extract_json_as_string(content, json_filter, ensure_is_ldjson_info_type=None): def extract_json_as_string(content, json_filter, ensure_is_ldjson_info_type=None):
from bs4 import BeautifulSoup
stripped_text_from_html = False stripped_text_from_html = False
# https://github.com/dgtlmoon/changedetection.io/pull/2041#issuecomment-1848397161w # https://github.com/dgtlmoon/changedetection.io/pull/2041#issuecomment-1848397161w
# Try to parse/filter out the JSON, if we get some parser error, then maybe it's embedded within HTML tags # Try to parse/filter out the JSON, if we get some parser error, then maybe it's embedded within HTML tags
@ -352,6 +353,7 @@ def strip_ignore_text(content, wordlist, mode="content"):
return "\n".encode('utf8').join(output) return "\n".encode('utf8').join(output)
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str: def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
from xml.sax.saxutils import escape as xml_escape
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>' pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
def repl(m): def repl(m):
text = m.group(1) text = m.group(1)
@ -360,6 +362,9 @@ def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False
return re.sub(pattern, repl, html_content) return re.sub(pattern, repl, html_content)
def html_to_text(html_content: str, render_anchor_tag_content=False, is_rss=False) -> str: def html_to_text(html_content: str, render_anchor_tag_content=False, is_rss=False) -> str:
from inscriptis import get_text
from inscriptis.model.config import ParserConfig
"""Converts html string to a string with just the text. If ignoring """Converts html string to a string with just the text. If ignoring
rendering anchor tag content is enable, anchor tag content are also rendering anchor tag content is enable, anchor tag content are also
included in the text included in the text

@ -1,9 +1,10 @@
import apprise
import time import time
from apprise import NotifyFormat from apprise import NotifyFormat
import json import apprise
from loguru import logger from loguru import logger
valid_tokens = { valid_tokens = {
'base_url': '', 'base_url': '',
'current_snapshot': '', 'current_snapshot': '',
@ -34,86 +35,11 @@ valid_notification_formats = {
default_notification_format_for_watch: default_notification_format_for_watch default_notification_format_for_watch: default_notification_format_for_watch
} }
# include the decorator
from apprise.decorators import notify
@notify(on="delete")
@notify(on="deletes")
@notify(on="get")
@notify(on="gets")
@notify(on="post")
@notify(on="posts")
@notify(on="put")
@notify(on="puts")
def apprise_custom_api_call_wrapper(body, title, notify_type, *args, **kwargs):
import requests
from apprise.utils import parse_url as apprise_parse_url
from apprise import URLBase
url = kwargs['meta'].get('url')
if url.startswith('post'):
r = requests.post
elif url.startswith('get'):
r = requests.get
elif url.startswith('put'):
r = requests.put
elif url.startswith('delete'):
r = requests.delete
url = url.replace('post://', 'http://')
url = url.replace('posts://', 'https://')
url = url.replace('put://', 'http://')
url = url.replace('puts://', 'https://')
url = url.replace('get://', 'http://')
url = url.replace('gets://', 'https://')
url = url.replace('put://', 'http://')
url = url.replace('puts://', 'https://')
url = url.replace('delete://', 'http://')
url = url.replace('deletes://', 'https://')
headers = {}
params = {}
auth = None
# Convert /foobar?+some-header=hello to proper header dictionary
results = apprise_parse_url(url)
if results:
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set and tidy entries by unquoting them
headers = {URLBase.unquote(x): URLBase.unquote(y)
for x, y in results['qsd+'].items()}
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
# In Apprise, it relies on prefixing each request arg with "-", because it uses say &method=update as a flag for apprise
# but here we are making straight requests, so we need todo convert this against apprise's logic
for k, v in results['qsd'].items():
if not k.strip('+-') in results['qsd+'].keys():
params[URLBase.unquote(k)] = URLBase.unquote(v)
# Determine Authentication
auth = ''
if results.get('user') and results.get('password'):
auth = (URLBase.unquote(results.get('user')), URLBase.unquote(results.get('user')))
elif results.get('user'):
auth = (URLBase.unquote(results.get('user')))
# Try to auto-guess if it's JSON
try:
json.loads(body)
headers['Content-Type'] = 'application/json; charset=utf-8'
except ValueError as e:
pass
r(results.get('url'),
auth=auth,
data=body.encode('utf-8') if type(body) is str else body,
headers=headers,
params=params
)
def process_notification(n_object, datastore): def process_notification(n_object, datastore):
# so that the custom endpoints are registered
from changedetectionio.apprise_plugin import apprise_custom_api_call_wrapper
from .safe_jinja import render as jinja_render from .safe_jinja import render as jinja_render
now = time.time() now = time.time()

@ -2,8 +2,7 @@ from .. import difference_detection_processor
from ..exceptions import ProcessorException from ..exceptions import ProcessorException
from . import Restock from . import Restock
from loguru import logger from loguru import logger
import hashlib
import re
import urllib3 import urllib3
import time import time
@ -27,6 +26,25 @@ def _search_prop_by_value(matches, value):
if value in prop[0]: if value in prop[0]:
return prop[1] # Yield the desired value and exit the function return prop[1] # Yield the desired value and exit the function
def _deduplicate_prices(data):
seen = set()
unique_data = []
for datum in data:
# Convert 'value' to float if it can be a numeric string, otherwise leave it as is
try:
normalized_value = float(datum.value) if isinstance(datum.value, str) and datum.value.replace('.', '', 1).isdigit() else datum.value
except ValueError:
normalized_value = datum.value
# If the normalized value hasn't been seen yet, add it to unique data
if normalized_value not in seen:
unique_data.append(datum)
seen.add(normalized_value)
return unique_data
# should return Restock() # should return Restock()
# add casting? # add casting?
def get_itemprop_availability(html_content) -> Restock: def get_itemprop_availability(html_content) -> Restock:
@ -36,6 +54,7 @@ def get_itemprop_availability(html_content) -> Restock:
""" """
from jsonpath_ng import parse from jsonpath_ng import parse
import re
now = time.time() now = time.time()
import extruct import extruct
logger.trace(f"Imported extruct module in {time.time() - now:.3f}s") logger.trace(f"Imported extruct module in {time.time() - now:.3f}s")
@ -60,7 +79,7 @@ def get_itemprop_availability(html_content) -> Restock:
pricecurrency_parse = parse('$..(pricecurrency|currency|priceCurrency )') pricecurrency_parse = parse('$..(pricecurrency|currency|priceCurrency )')
availability_parse = parse('$..(availability|Availability)') availability_parse = parse('$..(availability|Availability)')
price_result = price_parse.find(data) price_result = _deduplicate_prices(price_parse.find(data))
if price_result: if price_result:
# Right now, we just support single product items, maybe we will store the whole actual metadata seperately in teh future and # Right now, we just support single product items, maybe we will store the whole actual metadata seperately in teh future and
# parse that for the UI? # parse that for the UI?
@ -122,6 +141,10 @@ class perform_site_check(difference_detection_processor):
xpath_data = None xpath_data = None
def run_changedetection(self, watch, skip_when_checksum_same=True): def run_changedetection(self, watch, skip_when_checksum_same=True):
import hashlib
from concurrent.futures import ProcessPoolExecutor
from functools import partial
if not watch: if not watch:
raise Exception("Watch no longer exists.") raise Exception("Watch no longer exists.")
@ -149,7 +172,11 @@ class perform_site_check(difference_detection_processor):
itemprop_availability = {} itemprop_availability = {}
try: try:
itemprop_availability = get_itemprop_availability(html_content=self.fetcher.content) with ProcessPoolExecutor() as executor:
# Use functools.partial to create a callable with arguments
# anything using bs4/lxml etc is quite "leaky"
future = executor.submit(partial(get_itemprop_availability, self.fetcher.content))
itemprop_availability = future.result()
except MoreThanOnePriceFound as e: except MoreThanOnePriceFound as e:
# Add the real data # Add the real data
raise ProcessorException(message="Cannot run, more than one price detected, this plugin is only for product pages with ONE product, try the content-change detection mode.", raise ProcessorException(message="Cannot run, more than one price detected, this plugin is only for product pages with ONE product, try the content-change detection mode.",

@ -36,6 +36,9 @@ class PDFToHTMLToolNotFound(ValueError):
class perform_site_check(difference_detection_processor): class perform_site_check(difference_detection_processor):
def run_changedetection(self, watch, skip_when_checksum_same=True): def run_changedetection(self, watch, skip_when_checksum_same=True):
from concurrent.futures import ProcessPoolExecutor
from functools import partial
changed_detected = False changed_detected = False
html_content = "" html_content = ""
screenshot = False # as bytes screenshot = False # as bytes
@ -171,20 +174,30 @@ class perform_site_check(difference_detection_processor):
for filter_rule in include_filters_rule: for filter_rule in include_filters_rule:
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.." # For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'): if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''), with ProcessPoolExecutor() as executor:
# Use functools.partial to create a callable with arguments - anything using bs4/lxml etc is quite "leaky"
future = executor.submit(partial(html_tools.xpath_filter, xpath_filter=filter_rule.replace('xpath:', ''),
html_content=self.fetcher.content, html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url, append_pretty_line_formatting=not watch.is_source_type_url,
is_rss=is_rss) is_rss=is_rss))
html_content += future.result()
elif filter_rule.startswith('xpath1:'): elif filter_rule.startswith('xpath1:'):
html_content += html_tools.xpath1_filter(xpath_filter=filter_rule.replace('xpath1:', ''), with ProcessPoolExecutor() as executor:
# Use functools.partial to create a callable with arguments - anything using bs4/lxml etc is quite "leaky"
future = executor.submit(partial(html_tools.xpath1_filter, xpath_filter=filter_rule.replace('xpath1:', ''),
html_content=self.fetcher.content, html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url, append_pretty_line_formatting=not watch.is_source_type_url,
is_rss=is_rss) is_rss=is_rss))
html_content += future.result()
else: else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text with ProcessPoolExecutor() as executor:
html_content += html_tools.include_filters(include_filters=filter_rule, # Use functools.partial to create a callable with arguments - anything using bs4/lxml etc is quite "leaky"
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
future = executor.submit(partial(html_tools.include_filters, include_filters=filter_rule,
html_content=self.fetcher.content, html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url) append_pretty_line_formatting=not watch.is_source_type_url))
html_content += future.result()
if not html_content.strip(): if not html_content.strip():
raise FilterNotFoundInResponse(msg=include_filters_rule, screenshot=self.fetcher.screenshot, xpath_data=self.fetcher.xpath_data) raise FilterNotFoundInResponse(msg=include_filters_rule, screenshot=self.fetcher.screenshot, xpath_data=self.fetcher.xpath_data)
@ -197,12 +210,13 @@ class perform_site_check(difference_detection_processor):
else: else:
# extract text # extract text
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False) do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
stripped_text_from_html = \ with ProcessPoolExecutor() as executor:
html_tools.html_to_text( # Use functools.partial to create a callable with arguments - anything using bs4/lxml etc is quite "leaky"
html_content=html_content, # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
future = executor.submit(partial(html_tools.html_to_text, html_content=html_content,
render_anchor_tag_content=do_anchor, render_anchor_tag_content=do_anchor,
is_rss=is_rss # #1874 activate the <title workaround hack is_rss=is_rss)) #1874 activate the <title workaround hack
) stripped_text_from_html = future.result()
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied
text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8') text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')

@ -11,7 +11,6 @@ from threading import Lock
import json import json
import os import os
import re import re
import requests
import secrets import secrets
import threading import threading
import time import time
@ -270,6 +269,7 @@ class ChangeDetectionStore:
self.needs_write_urgent = True self.needs_write_urgent = True
def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True): def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True):
import requests
if extras is None: if extras is None:
extras = {} extras = {}

@ -1,5 +1,6 @@
import os import os
import time import time
from loguru import logger
from flask import url_for from flask import url_for
from .util import set_original_response, live_server_setup, extract_UUID_from_client, wait_for_all_checks, \ from .util import set_original_response, live_server_setup, extract_UUID_from_client, wait_for_all_checks, \
wait_for_notification_endpoint_output wait_for_notification_endpoint_output
@ -27,6 +28,12 @@ def run_filter_test(client, live_server, content_filter):
# Response WITHOUT the filter ID element # Response WITHOUT the filter ID element
set_original_response() set_original_response()
# Goto the edit page, add our ignore text
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
# cleanup for the next # cleanup for the next
client.get( client.get(
url_for("form_delete", uuid="all"), url_for("form_delete", uuid="all"),
@ -35,85 +42,92 @@ def run_filter_test(client, live_server, content_filter):
if os.path.isfile("test-datastore/notification.txt"): if os.path.isfile("test-datastore/notification.txt"):
os.unlink("test-datastore/notification.txt") os.unlink("test-datastore/notification.txt")
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post( res = client.post(
url_for("form_quick_watch_add"), url_for("import_page"),
data={"url": test_url, "tags": ''}, data={"urls": test_url},
follow_redirects=True follow_redirects=True
) )
assert b"Watch added" in res.data assert b"1 Imported" in res.data
# Give the thread time to pick up the first version
wait_for_all_checks(client) wait_for_all_checks(client)
# Goto the edit page, add our ignore text uuid = extract_UUID_from_client(client)
# Add our URL to the import page
url = url_for('test_notification_endpoint', _external=True) assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 0, "No filter = No filter failure"
notification_url = url.replace('http', 'json')
watch_data = {"notification_urls": notification_url,
print(">>>> Notification URL: " + notification_url) "notification_title": "New ChangeDetection.io Notification - {{watch_url}}",
"notification_body": "BASE URL: {{base_url}}\n"
# Just a regular notification setting, this will be used by the special 'filter not found' notification "Watch URL: {{watch_url}}\n"
notification_form_data = {"notification_urls": notification_url, "Watch UUID: {{watch_uuid}}\n"
"notification_title": "New ChangeDetection.io Notification - {{watch_url}}", "Watch title: {{watch_title}}\n"
"notification_body": "BASE URL: {{base_url}}\n" "Watch tag: {{watch_tag}}\n"
"Watch URL: {{watch_url}}\n" "Preview: {{preview_url}}\n"
"Watch UUID: {{watch_uuid}}\n" "Diff URL: {{diff_url}}\n"
"Watch title: {{watch_title}}\n" "Snapshot: {{current_snapshot}}\n"
"Watch tag: {{watch_tag}}\n" "Diff: {{diff}}\n"
"Preview: {{preview_url}}\n" "Diff Full: {{diff_full}}\n"
"Diff URL: {{diff_url}}\n" "Diff as Patch: {{diff_patch}}\n"
"Snapshot: {{current_snapshot}}\n" ":-)",
"Diff: {{diff}}\n" "notification_format": "Text",
"Diff Full: {{diff_full}}\n" "fetch_backend": "html_requests",
"Diff as Patch: {{diff_patch}}\n" "filter_failure_notification_send": 'y',
":-)", "headers": "",
"notification_format": "Text"} "tags": "my tag",
"title": "my title 123",
notification_form_data.update({ "time_between_check-hours": 5, # So that the queue runner doesnt also put it in
"url": test_url, "url": test_url,
"tags": "my tag", }
"title": "my title 123",
"headers": "",
"filter_failure_notification_send": 'y',
"include_filters": content_filter,
"fetch_backend": "html_requests"})
# A POST here will also reset the filter failure counter (filter_failure_notification_threshold_attempts)
res = client.post( res = client.post(
url_for("edit_page", uuid="first"), url_for("edit_page", uuid=uuid),
data=notification_form_data, data=watch_data,
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
wait_for_all_checks(client) wait_for_all_checks(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 0, "No filter = No filter failure"
# Now the notification should not exist, because we didnt reach the threshold # Now add a filter, because recheck hours == 5, ONLY pressing of the [edit] or [recheck all] should trigger
watch_data['include_filters'] = content_filter
res = client.post(
url_for("edit_page", uuid=uuid),
data=watch_data,
follow_redirects=True
)
assert b"Updated watch." in res.data
# It should have checked once so far and given this error (because we hit SAVE)
wait_for_all_checks(client)
assert not os.path.isfile("test-datastore/notification.txt") assert not os.path.isfile("test-datastore/notification.txt")
# Hitting [save] would have triggered a recheck, and we have a filter, so this would be ONE failure
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 1, "Should have been checked once"
# recheck it up to just before the threshold, including the fact that in the previous POST it would have rechecked (and incremented) # recheck it up to just before the threshold, including the fact that in the previous POST it would have rechecked (and incremented)
for i in range(0, App._FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT-2): # Add 4 more checks
checked = 0
ATTEMPT_THRESHOLD_SETTING = live_server.app.config['DATASTORE'].data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
for i in range(0, ATTEMPT_THRESHOLD_SETTING - 2):
checked += 1
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
wait_for_notification_endpoint_output() res = client.get(url_for("index"))
assert not os.path.isfile("test-datastore/notification.txt"), f"test-datastore/notification.txt should not exist - Attempt {i} when threshold is {App._FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT}" assert b'Warning, no filters were found' in res.data
assert not os.path.isfile("test-datastore/notification.txt")
# We should see something in the frontend time.sleep(1)
res = client.get(url_for("index"))
assert b'Warning, no filters were found' in res.data assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 5
time.sleep(2) time.sleep(2)
# One more check should trigger the _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT threshold # One more check should trigger the _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT threshold
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
wait_for_notification_endpoint_output() wait_for_notification_endpoint_output()
# Now it should exist and contain our "filter not found" alert # Now it should exist and contain our "filter not found" alert
assert os.path.isfile("test-datastore/notification.txt") assert os.path.isfile("test-datastore/notification.txt")
with open("test-datastore/notification.txt", 'r') as f: with open("test-datastore/notification.txt", 'r') as f:
notification = f.read() notification = f.read()
@ -126,7 +140,7 @@ def run_filter_test(client, live_server, content_filter):
set_response_with_filter() set_response_with_filter()
# Try several times, it should NOT have 'filter not found' # Try several times, it should NOT have 'filter not found'
for i in range(0, App._FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT): for i in range(0, ATTEMPT_THRESHOLD_SETTING + 2):
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
@ -139,9 +153,6 @@ def run_filter_test(client, live_server, content_filter):
assert not 'CSS/xPath filter was not present in the page' in notification assert not 'CSS/xPath filter was not present in the page' in notification
# Re #1247 - All tokens got replaced correctly in the notification # Re #1247 - All tokens got replaced correctly in the notification
res = client.get(url_for("index"))
uuid = extract_UUID_from_client(client)
# UUID is correct, but notification contains tag uuid as UUIID wtf
assert uuid in notification assert uuid in notification
# cleanup for the next # cleanup for the next
@ -156,9 +167,11 @@ def test_setup(live_server):
live_server_setup(live_server) live_server_setup(live_server)
def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage): def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage):
# live_server_setup(live_server)
run_filter_test(client, live_server,'#nope-doesnt-exist') run_filter_test(client, live_server,'#nope-doesnt-exist')
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage): def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage):
# live_server_setup(live_server)
run_filter_test(client, live_server, '//*[@id="nope-doesnt-exist"]') run_filter_test(client, live_server, '//*[@id="nope-doesnt-exist"]')
# Test that notification is never sent # Test that notification is never sent

@ -146,14 +146,13 @@ def _run_test_minmax_limit(client, extra_watch_edit_form):
data={"url": test_url, "tags": 'restock tests', 'processor': 'restock_diff'}, data={"url": test_url, "tags": 'restock tests', 'processor': 'restock_diff'},
follow_redirects=True follow_redirects=True
) )
# A change in price, should trigger a change by default
wait_for_all_checks(client) wait_for_all_checks(client)
data = { data = {
"tags": "", "tags": "",
"url": test_url, "url": test_url,
"headers": "", "headers": "",
"time_between_check-hours": 5,
'fetch_backend': "html_requests" 'fetch_backend': "html_requests"
} }
data.update(extra_watch_edit_form) data.update(extra_watch_edit_form)
@ -178,12 +177,9 @@ def _run_test_minmax_limit(client, extra_watch_edit_form):
assert b'1,000.45' or b'1000.45' in res.data #depending on locale assert b'1,000.45' or b'1000.45' in res.data #depending on locale
assert b'unviewed' not in res.data assert b'unviewed' not in res.data
# price changed to something LESS than min (900), SHOULD be a change # price changed to something LESS than min (900), SHOULD be a change
set_original_response(props_markup=instock_props[0], price='890.45') set_original_response(props_markup=instock_props[0], price='890.45')
# let previous runs wait
time.sleep(2)
res = client.get(url_for("form_watch_checknow"), follow_redirects=True) res = client.get(url_for("form_watch_checknow"), follow_redirects=True)
assert b'1 watches queued for rechecking.' in res.data assert b'1 watches queued for rechecking.' in res.data
wait_for_all_checks(client) wait_for_all_checks(client)

@ -189,7 +189,9 @@ class update_worker(threading.Thread):
'screenshot': None 'screenshot': None
}) })
self.notification_q.put(n_object) self.notification_q.put(n_object)
logger.error(f"Sent filter not found notification for {watch_uuid}") logger.debug(f"Sent filter not found notification for {watch_uuid}")
else:
logger.debug(f"NOT sending filter not found notification for {watch_uuid} - no notification URLs")
def send_step_failure_notification(self, watch_uuid, step_n): def send_step_failure_notification(self, watch_uuid, step_n):
watch = self.datastore.data['watching'].get(watch_uuid, False) watch = self.datastore.data['watching'].get(watch_uuid, False)
@ -364,18 +366,22 @@ class update_worker(threading.Thread):
# Only when enabled, send the notification # Only when enabled, send the notification
if watch.get('filter_failure_notification_send', False): if watch.get('filter_failure_notification_send', False):
c = watch.get('consecutive_filter_failures', 5) c = watch.get('consecutive_filter_failures', 0)
c += 1 c += 1
# Send notification if we reached the threshold? # Send notification if we reached the threshold?
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
0) logger.debug(f"Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}")
logger.warning(f"Filter for {uuid} not found, consecutive_filter_failures: {c}") if c >= threshold:
if threshold > 0 and c >= threshold:
if not watch.get('notification_muted'): if not watch.get('notification_muted'):
logger.debug(f"Sending filter failed notification for {uuid}")
self.send_filter_failure_notification(uuid) self.send_filter_failure_notification(uuid)
c = 0 c = 0
logger.debug(f"Reset filter failure count back to zero")
self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
else:
logger.trace(f"{uuid} - filter_failure_notification_send not enabled, skipping")
process_changedetection_results = False process_changedetection_results = False
@ -422,7 +428,7 @@ class update_worker(threading.Thread):
) )
if watch.get('filter_failure_notification_send', False): if watch.get('filter_failure_notification_send', False):
c = watch.get('consecutive_filter_failures', 5) c = watch.get('consecutive_filter_failures', 0)
c += 1 c += 1
# Send notification if we reached the threshold? # Send notification if we reached the threshold?
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts',

Loading…
Cancel
Save