RSS Fetching - Handle CDATA (commented out text) in RSS correctly, generally handle RSS better (#1866)

pull/1867/head
dgtlmoon 1 year ago committed by GitHub
parent 9cb636e638
commit f707c914b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,9 +1,12 @@
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from inscriptis import get_text from inscriptis import get_text
from inscriptis.model.config import ParserConfig
from jsonpath_ng.ext import parse from jsonpath_ng.ext import parse
from typing import List from typing import List
from inscriptis.css_profiles import CSS_PROFILES, HtmlElement
from inscriptis.html_properties import Display
from inscriptis.model.config import ParserConfig
from xml.sax.saxutils import escape as xml_escape
import json import json
import re import re
@ -68,10 +71,15 @@ def element_removal(selectors: List[str], html_content):
# Return str Utf-8 of matched rules # Return str Utf-8 of matched rules
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False): def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
from lxml import etree, html from lxml import etree, html
tree = html.fromstring(bytes(html_content, encoding='utf-8')) parser = None
if is_rss:
# So that we can keep CDATA for cdata_in_document_to_text() to process
parser = etree.XMLParser(strip_cdata=False)
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
html_block = "" html_block = ""
r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}) r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'})
@ -90,11 +98,13 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False
elif type(element) == etree._ElementUnicodeResult: elif type(element) == etree._ElementUnicodeResult:
html_block += str(element) html_block += str(element)
else: else:
html_block += etree.tostring(element, pretty_print=True).decode('utf-8') if not is_rss:
html_block += etree.tostring(element, pretty_print=True).decode('utf-8')
else:
html_block += f"<div>{element.text}</div>\n"
return html_block return html_block
# Extract/find element # Extract/find element
def extract_element(find='title', html_content=''): def extract_element(find='title', html_content=''):
@ -260,8 +270,15 @@ def strip_ignore_text(content, wordlist, mode="content"):
return "\n".encode('utf8').join(output) return "\n".encode('utf8').join(output)
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
def repl(m):
text = m.group(1)
return xml_escape(html_to_text(html_content=text))
def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: return re.sub(pattern, repl, html_content)
def html_to_text(html_content: str, render_anchor_tag_content=False, is_rss=False) -> str:
"""Converts html string to a string with just the text. If ignoring """Converts html string to a string with just the text. If ignoring
rendering anchor tag content is enable, anchor tag content are also rendering anchor tag content is enable, anchor tag content are also
included in the text included in the text
@ -277,17 +294,22 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
# if anchor tag content flag is set to True define a config for # if anchor tag content flag is set to True define a config for
# extracting this content # extracting this content
if render_anchor_tag_content: if render_anchor_tag_content:
parser_config = ParserConfig( parser_config = ParserConfig(
annotation_rules={"a": ["hyperlink"]}, display_links=True annotation_rules={"a": ["hyperlink"]}, display_links=True
) )
# otherwise set config to None/default
# otherwise set config to None
else: else:
parser_config = None parser_config = None
# get text and annotations via inscriptis # RSS Mode - Inscriptis will treat `title` as something else.
text_content = get_text(html_content, config=parser_config) # Make it as a regular block display element (//item/title)
if is_rss:
css = CSS_PROFILES['strict'].copy()
css['title'] = HtmlElement(display=Display.block)
text_content = get_text(html_content, ParserConfig(css=css))
else:
# get text and annotations via inscriptis
text_content = get_text(html_content, config=parser_config)
return text_content return text_content

@ -11,7 +11,7 @@ from changedetectionio import content_fetcher, html_tools
from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT
from copy import deepcopy from copy import deepcopy
from . import difference_detection_processor from . import difference_detection_processor
from ..html_tools import PERL_STYLE_REGEX from ..html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@ -153,6 +153,14 @@ class perform_site_check(difference_detection_processor):
is_json = 'application/json' in fetcher.get_all_headers().get('content-type', '').lower() is_json = 'application/json' in fetcher.get_all_headers().get('content-type', '').lower()
is_html = not is_json is_html = not is_json
is_rss = False
ctype_header = fetcher.get_all_headers().get('content-type', '').lower()
# Go into RSS preprocess for converting CDATA/comment to usable text
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']):
if '<rss' in fetcher.content[:100].lower():
fetcher.content = cdata_in_document_to_text(html_content=fetcher.content)
is_rss = True
# source: support, basically treat it as plaintext # source: support, basically treat it as plaintext
if is_source: if is_source:
@ -242,7 +250,8 @@ class perform_site_check(difference_detection_processor):
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'): if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''), html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''),
html_content=fetcher.content, html_content=fetcher.content,
append_pretty_line_formatting=not is_source) append_pretty_line_formatting=not is_source,
is_rss=is_rss)
else: else:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
html_content += html_tools.include_filters(include_filters=filter_rule, html_content += html_tools.include_filters(include_filters=filter_rule,
@ -262,8 +271,9 @@ class perform_site_check(difference_detection_processor):
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False) do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
stripped_text_from_html = \ stripped_text_from_html = \
html_tools.html_to_text( html_tools.html_to_text(
html_content, html_content=html_content,
render_anchor_tag_content=do_anchor render_anchor_tag_content=do_anchor,
is_rss=is_rss
) )
# Re #340 - return the content before the 'ignore text' was applied # Re #340 - return the content before the 'ignore text' was applied

@ -2,12 +2,61 @@
import time import time
from flask import url_for from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client
def set_original_cdata_xml():
test_return_data = """<rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:media="http://search.yahoo.com/mrss/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<title>Gizi</title>
<link>https://test.com</link>
<atom:link href="https://testsite.com" rel="self" type="application/rss+xml"/>
<description>
<![CDATA[ The Future Could Be Here ]]>
</description>
<language>en</language>
<item>
<title>
<![CDATA[ <img src="https://testsite.com/hacked.jpg"> Hackers can access your computer ]]>
</title>
<link>https://testsite.com/news/12341234234</link>
<description>
<![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]>
</description>
<category>cybernetics</category>
<category>rand corporation</category>
<pubDate>Tue, 17 Oct 2023 15:10:00 GMT</pubDate>
<guid isPermaLink="false">1850933241</guid>
<dc:creator>
<![CDATA[ Mr Hacker News ]]>
</dc:creator>
<media:thumbnail url="https://testsite.com/thumbnail-c224e10d81488e818701c981da04869e.jpg"/>
</item>
<item>
<title> Some other title </title>
<link>https://testsite.com/news/12341234236</link>
<description>
Some other description
</description>
</item>
</channel>
</rss>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_setup(client, live_server):
live_server_setup(live_server)
def test_rss_and_token(client, live_server): def test_rss_and_token(client, live_server):
# live_server_setup(live_server)
set_original_response() set_original_response()
live_server_setup(live_server) rss_token = extract_rss_token_from_UI(client)
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
@ -17,11 +66,11 @@ def test_rss_and_token(client, live_server):
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
rss_token = extract_rss_token_from_UI(client)
time.sleep(2) wait_for_all_checks(client)
set_modified_response()
client.get(url_for("form_watch_checknow"), follow_redirects=True) client.get(url_for("form_watch_checknow"), follow_redirects=True)
time.sleep(2) wait_for_all_checks(client)
# Add our URL to the import page # Add our URL to the import page
res = client.get( res = client.get(
@ -37,3 +86,77 @@ def test_rss_and_token(client, live_server):
) )
assert b"Access denied, bad token" not in res.data assert b"Access denied, bad token" not in res.data
assert b"Random content" in res.data assert b"Random content" in res.data
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
def test_basic_cdata_rss_markup(client, live_server):
#live_server_setup(live_server)
set_original_cdata_xml()
test_url = url_for('test_endpoint', content_type="application/xml", _external=True)
# Add our URL to the import page
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'CDATA' not in res.data
assert b'<![' not in res.data
assert b'Hackers can access your computer' in res.data
assert b'The days of Terminator' in res.data
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
def test_rss_xpath_filtering(client, live_server):
# live_server_setup(live_server)
set_original_cdata_xml()
test_url = url_for('test_endpoint', content_type="application/xml", _external=True)
res = client.post(
url_for("form_quick_watch_add"),
data={"url": test_url, "tags": '', 'edit_and_watch_submit_button': 'Edit > Watch'},
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
uuid = extract_UUID_from_client(client)
res = client.post(
url_for("edit_page", uuid=uuid, unpause_on_save=1),
data={
"include_filters": "//item/title",
"fetch_backend": "html_requests",
"headers": "",
"proxy": "no-proxy",
"tags": "",
"url": test_url,
},
follow_redirects=True
)
assert b"unpaused" in res.data
wait_for_all_checks(client)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b'CDATA' not in res.data
assert b'<![' not in res.data
assert b'Hackers can access your computer' in res.data # Should ONLY be selected by the xpath
assert b'Some other title' in res.data # Should ONLY be selected by the xpath
assert b'The days of Terminator' not in res.data # Should NOT be selected by the xpath
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)

Loading…
Cancel
Save