diff --git a/changedetectionio/html_tools.py b/changedetectionio/html_tools.py index 8df14f32..bd9a45d7 100644 --- a/changedetectionio/html_tools.py +++ b/changedetectionio/html_tools.py @@ -1,9 +1,12 @@ from bs4 import BeautifulSoup from inscriptis import get_text -from inscriptis.model.config import ParserConfig from jsonpath_ng.ext import parse from typing import List +from inscriptis.css_profiles import CSS_PROFILES, HtmlElement +from inscriptis.html_properties import Display +from inscriptis.model.config import ParserConfig +from xml.sax.saxutils import escape as xml_escape import json import re @@ -68,10 +71,15 @@ def element_removal(selectors: List[str], html_content): # Return str Utf-8 of matched rules -def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False): +def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False): from lxml import etree, html - tree = html.fromstring(bytes(html_content, encoding='utf-8')) + parser = None + if is_rss: + # So that we can keep CDATA for cdata_in_document_to_text() to process + parser = etree.XMLParser(strip_cdata=False) + + tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser) html_block = "" r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}) @@ -90,11 +98,13 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False elif type(element) == etree._ElementUnicodeResult: html_block += str(element) else: - html_block += etree.tostring(element, pretty_print=True).decode('utf-8') + if not is_rss: + html_block += etree.tostring(element, pretty_print=True).decode('utf-8') + else: + html_block += f"
{element.text}
\n" return html_block - # Extract/find element def extract_element(find='title', html_content=''): @@ -260,8 +270,15 @@ def strip_ignore_text(content, wordlist, mode="content"): return "\n".encode('utf8').join(output) +def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str: + pattern = ')\s*)*)\]\]>' + def repl(m): + text = m.group(1) + return xml_escape(html_to_text(html_content=text)) -def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: + return re.sub(pattern, repl, html_content) + +def html_to_text(html_content: str, render_anchor_tag_content=False, is_rss=False) -> str: """Converts html string to a string with just the text. If ignoring rendering anchor tag content is enable, anchor tag content are also included in the text @@ -277,17 +294,22 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: # if anchor tag content flag is set to True define a config for # extracting this content if render_anchor_tag_content: - parser_config = ParserConfig( annotation_rules={"a": ["hyperlink"]}, display_links=True ) - - # otherwise set config to None + # otherwise set config to None/default else: parser_config = None - # get text and annotations via inscriptis - text_content = get_text(html_content, config=parser_config) + # RSS Mode - Inscriptis will treat `title` as something else. + # Make it as a regular block display element (//item/title) + if is_rss: + css = CSS_PROFILES['strict'].copy() + css['title'] = HtmlElement(display=Display.block) + text_content = get_text(html_content, ParserConfig(css=css)) + else: + # get text and annotations via inscriptis + text_content = get_text(html_content, config=parser_config) return text_content diff --git a/changedetectionio/processors/text_json_diff.py b/changedetectionio/processors/text_json_diff.py index bada0a1d..5c5e81b7 100644 --- a/changedetectionio/processors/text_json_diff.py +++ b/changedetectionio/processors/text_json_diff.py @@ -11,7 +11,7 @@ from changedetectionio import content_fetcher, html_tools from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from copy import deepcopy from . import difference_detection_processor -from ..html_tools import PERL_STYLE_REGEX +from ..html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -153,6 +153,14 @@ class perform_site_check(difference_detection_processor): is_json = 'application/json' in fetcher.get_all_headers().get('content-type', '').lower() is_html = not is_json + is_rss = False + + ctype_header = fetcher.get_all_headers().get('content-type', '').lower() + # Go into RSS preprocess for converting CDATA/comment to usable text + if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']): + if ' + + Gizi + https://test.com + + + + + en + + + <![CDATA[ <img src="https://testsite.com/hacked.jpg"> Hackers can access your computer ]]> + + https://testsite.com/news/12341234234 + +

The days of Terminator and The Matrix could be closer. But be positive.

Read more link...

]]> +
+ cybernetics + rand corporation + Tue, 17 Oct 2023 15:10:00 GMT + 1850933241 + + + + +
+ + + Some other title + https://testsite.com/news/12341234236 + + Some other description + + +
+ + """ + + with open("test-datastore/endpoint-content.txt", "w") as f: + f.write(test_return_data) + + +def test_setup(client, live_server): + live_server_setup(live_server) + def test_rss_and_token(client, live_server): + # live_server_setup(live_server) + set_original_response() - live_server_setup(live_server) + rss_token = extract_rss_token_from_UI(client) # Add our URL to the import page res = client.post( @@ -17,11 +66,11 @@ def test_rss_and_token(client, live_server): ) assert b"1 Imported" in res.data - rss_token = extract_rss_token_from_UI(client) - time.sleep(2) + wait_for_all_checks(client) + set_modified_response() client.get(url_for("form_watch_checknow"), follow_redirects=True) - time.sleep(2) + wait_for_all_checks(client) # Add our URL to the import page res = client.get( @@ -37,3 +86,77 @@ def test_rss_and_token(client, live_server): ) assert b"Access denied, bad token" not in res.data assert b"Random content" in res.data + + res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True) + +def test_basic_cdata_rss_markup(client, live_server): + #live_server_setup(live_server) + + set_original_cdata_xml() + + test_url = url_for('test_endpoint', content_type="application/xml", _external=True) + + # Add our URL to the import page + res = client.post( + url_for("import_page"), + data={"urls": test_url}, + follow_redirects=True + ) + + assert b"1 Imported" in res.data + + wait_for_all_checks(client) + + res = client.get( + url_for("preview_page", uuid="first"), + follow_redirects=True + ) + assert b'CDATA' not in res.data + assert b' Watch'}, + follow_redirects=True + ) + assert b"Watch added in Paused state, saving will unpause" in res.data + + uuid = extract_UUID_from_client(client) + res = client.post( + url_for("edit_page", uuid=uuid, unpause_on_save=1), + data={ + "include_filters": "//item/title", + "fetch_backend": "html_requests", + "headers": "", + "proxy": "no-proxy", + "tags": "", + "url": test_url, + }, + follow_redirects=True + ) + assert b"unpaused" in res.data + + wait_for_all_checks(client) + + res = client.get( + url_for("preview_page", uuid="first"), + follow_redirects=True + ) + assert b'CDATA' not in res.data + assert b'