Merge branch 'master' into restock-visualselector-refactor

pull/2185/head
dgtlmoon 2 months ago committed by GitHub
commit 8a2afaa712
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -72,7 +72,11 @@ jobs:
run: |
# Playwright via Sockpuppetbrowser fetch
# tests/visualselector/test_fetch_data.py will do browser steps
docker run --rm -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest tests/fetchers/test_content.py && pytest tests/test_errorhandling.py && pytest tests/visualselector/test_fetch_data.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/fetchers/test_content.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_errorhandling.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/visualselector/test_fetch_data.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/fetchers/test_custom_js_before_content.py'
- name: Playwright and SocketPuppetBrowser - Headers and requests
run: |
@ -87,8 +91,11 @@ jobs:
# STRAIGHT TO CDP
- name: Pyppeteer and SocketPuppetBrowser - Specific tests in built container
run: |
# Playwright via Sockpuppetbrowser fetch
docker run --rm -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" -e "FAST_PUPPETEER_CHROME_FETCHER=True" --network changedet-network test-changedetectionio bash -c 'cd changedetectionio;pytest tests/fetchers/test_content.py && pytest tests/test_errorhandling.py && pytest tests/visualselector/test_fetch_data.py'
# Playwright via Sockpuppetbrowser fetch
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/fetchers/test_content.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/test_errorhandling.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/visualselector/test_fetch_data.py'
docker run --rm -e "FLASK_SERVER_NAME=cdio" -e "FAST_PUPPETEER_CHROME_FETCHER=True" -e "PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000" --network changedet-network --hostname=cdio test-changedetectionio bash -c 'cd changedetectionio;pytest --live-server-host=0.0.0.0 --live-server-port=5004 tests/fetchers/test_custom_js_before_content.py'
- name: Pyppeteer and SocketPuppetBrowser - Headers and requests checks
run: |

@ -6,6 +6,8 @@ import re
from random import randint
from loguru import logger
from changedetectionio.content_fetchers.base import manage_user_agent
# Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end
# 0- off, 1- on
browser_step_ui_config = {'Choose one': '0 0',
@ -178,6 +180,7 @@ class browsersteps_live_ui(steppable_browser_interface):
stale = False
# bump and kill this if idle after X sec
age_start = 0
headers = {}
# use a special driver, maybe locally etc
command_executor = os.getenv(
@ -192,7 +195,8 @@ class browsersteps_live_ui(steppable_browser_interface):
browser_type = os.getenv("PLAYWRIGHT_BROWSER_TYPE", 'chromium').strip('"')
def __init__(self, playwright_browser, proxy=None):
def __init__(self, playwright_browser, proxy=None, headers=None):
self.headers = headers or {}
self.age_start = time.time()
self.playwright_browser = playwright_browser
if self.context is None:
@ -206,16 +210,17 @@ class browsersteps_live_ui(steppable_browser_interface):
# @todo handle multiple contexts, bind a unique id from the browser on each req?
self.context = self.playwright_browser.new_context(
# @todo
# user_agent=request_headers['User-Agent'] if request_headers.get('User-Agent') else 'Mozilla/5.0',
# proxy=self.proxy,
# This is needed to enable JavaScript execution on GitHub and others
bypass_csp=True,
# Should never be needed
accept_downloads=False,
proxy=proxy
accept_downloads=False, # Should never be needed
bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others
extra_http_headers=self.headers,
ignore_https_errors=True,
proxy=proxy,
service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'),
# Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers
user_agent=manage_user_agent(headers=self.headers),
)
self.page = self.context.new_page()
# self.page.set_default_navigation_timeout(keep_open)

@ -1,5 +1,4 @@
from playwright.sync_api import PlaywrightContextManager
import asyncio
# So playwright wants to run as a context manager, but we do something horrible and hacky
# we are holding the session open for as long as possible, then shutting it down, and opening a new one

@ -11,9 +11,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def tags_overview_page():
from .form import SingleTag
add_form = SingleTag(request.form)
sorted_tags = sorted(datastore.data['settings']['application'].get('tags').items(), key=lambda x: x[1]['title'])
output = render_template("groups-overview.html",
form=add_form,
available_tags=datastore.data['settings']['application'].get('tags', {}),
available_tags=sorted_tags,
)
return output

@ -40,7 +40,7 @@
<td colspan="3">No website organisational tags/groups configured</td>
</tr>
{% endif %}
{% for uuid, tag in available_tags.items() %}
{% for uuid, tag in available_tags %}
<tr id="{{ uuid }}" class="{{ loop.cycle('pure-table-odd', 'pure-table-even') }}">
<td class="watch-controls">
<a class="link-mute state-{{'on' if tag.notification_muted else 'off'}}" href="{{url_for('tags.mute', uuid=tag.uuid)}}"><img src="{{url_for('static_content', group='images', filename='bell-off.svg')}}" alt="Mute notifications" title="Mute notifications" class="icon icon-mute" ></a>

@ -1,6 +1,6 @@
import sys
from distutils.util import strtobool
from loguru import logger
from changedetectionio.content_fetchers.exceptions import BrowserStepsStepException
import os
@ -29,10 +29,15 @@ def available_fetchers():
# rather than site-specific.
use_playwright_as_chrome_fetcher = os.getenv('PLAYWRIGHT_DRIVER_URL', False)
if use_playwright_as_chrome_fetcher:
# @note - For now, browser steps always uses playwright
if not strtobool(os.getenv('FAST_PUPPETEER_CHROME_FETCHER', 'False')):
logger.debug('Using Playwright library as fetcher')
from .playwright import fetcher as html_webdriver
else:
logger.debug('Using direct Python Puppeteer library as fetcher')
from .puppeteer import fetcher as html_webdriver
else:
logger.debug("Falling back to selenium as fetcher")
from .webdriver_selenium import fetcher as html_webdriver

@ -5,6 +5,40 @@ from loguru import logger
from changedetectionio.content_fetchers import BrowserStepsStepException
def manage_user_agent(headers, current_ua=''):
"""
Basic setting of user-agent
NOTE!!!!!! The service that does the actual Chrome fetching should handle any anti-robot techniques
THERE ARE MANY WAYS THAT IT CAN BE DETECTED AS A ROBOT!!
This does not take care of
- Scraping of 'navigator' (platform, productSub, vendor, oscpu etc etc) browser object (navigator.appVersion) etc
- TCP/IP fingerprint JA3 etc
- Graphic rendering fingerprinting
- Your IP being obviously in a pool of bad actors
- Too many requests
- Scraping of SCH-UA browser replies (thanks google!!)
- Scraping of ServiceWorker, new window calls etc
See https://filipvitas.medium.com/how-to-set-user-agent-header-with-puppeteer-js-and-not-fail-28c7a02165da
Puppeteer requests https://github.com/dgtlmoon/pyppeteerstealth
:param page:
:param headers:
:return:
"""
# Ask it what the user agent is, if its obviously ChromeHeadless, switch it to the default
ua_in_custom_headers = next((v for k, v in headers.items() if k.lower() == "user-agent"), None)
if ua_in_custom_headers:
return ua_in_custom_headers
if not ua_in_custom_headers and current_ua:
current_ua = current_ua.replace('HeadlessChrome', 'Chrome')
return current_ua
return None
class Fetcher():
browser_connection_is_custom = None
browser_connection_url = None

@ -3,7 +3,8 @@ import os
from urllib.parse import urlparse
from loguru import logger
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable
class fetcher(Fetcher):
@ -102,19 +103,16 @@ class fetcher(Fetcher):
# Set user agent to prevent Cloudflare from blocking the browser
# Use the default one configured in the App.py model that's passed from fetch_site_status.py
context = browser.new_context(
user_agent={k.lower(): v for k, v in request_headers.items()}.get('user-agent', None),
accept_downloads=False, # Should never be needed
bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others
extra_http_headers=request_headers,
ignore_https_errors=True,
proxy=self.proxy,
# This is needed to enable JavaScript execution on GitHub and others
bypass_csp=True,
# Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers
service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'),
# Should never be needed
accept_downloads=False
service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'), # Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers
user_agent=manage_user_agent(headers=request_headers),
)
self.page = context.new_page()
if len(request_headers):
context.set_extra_http_headers(request_headers)
# Listen for all console events and handle errors
self.page.on("console", lambda msg: print(f"Playwright console: Watch URL: {url} {msg.type}: {msg.text} {msg.args}"))

@ -5,7 +5,8 @@ import websockets.exceptions
from urllib.parse import urlparse
from loguru import logger
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, BrowserConnectError
@ -100,10 +101,11 @@ class fetcher(Fetcher):
else:
self.page = await browser.newPage()
await self.page.setUserAgent(manage_user_agent(headers=request_headers, current_ua=await self.page.evaluate('navigator.userAgent')))
await self.page.setBypassCSP(True)
if request_headers:
await self.page.setExtraHTTPHeaders(request_headers)
# @todo check user-agent worked
# SOCKS5 with authentication is not supported (yet)
# https://github.com/microsoft/playwright/issues/10567
@ -212,8 +214,12 @@ class fetcher(Fetcher):
logger.error('ERROR: Failed to get viewport-only reduced screenshot :(')
pass
finally:
# It's good to log here in the case that the browser crashes on shutting down but we still get the data we need
logger.success(f"Fetching '{url}' complete, closing page")
await self.page.close()
logger.success(f"Fetching '{url}' complete, closing browser")
await browser.close()
logger.success(f"Fetching '{url}' complete, exiting puppeteer fetch.")
async def main(self, **kwargs):
await self.fetch_page(**kwargs)

@ -10,7 +10,7 @@ function isItemInStock() {
const outOfStockTexts = [
' أخبرني عندما يتوفر',
'0 in stock',
'actuellement indisponible',
'actuellement indisponible',
'agotado',
'article épuisé',
'artikel zurzeit vergriffen',
@ -144,7 +144,7 @@ function isItemInStock() {
if (elementText.length) {
// try which ones could mean its in stock
if (negateOutOfStockRegex.test(elementText)) {
if (negateOutOfStockRegex.test(elementText) && !elementText.includes('(0 products)')) {
console.log(`Negating/overriding 'Out of Stock' back to "Possibly in stock" found "${elementText}"`)
return 'Possibly in stock';
}
@ -156,7 +156,9 @@ function isItemInStock() {
const element = elementsToScan[i];
// outside the 'fold' or some weird text in the heading area
// .getBoundingClientRect() was causing a crash in chrome 119, can only be run on contentVisibility != hidden
if (element.getBoundingClientRect().top + window.scrollY >= vh || element.getBoundingClientRect().top + window.scrollY <= 100) {
// Should be in the "above the fold" plus about 150px
if (element.getBoundingClientRect().top + window.scrollY >= vh + 150 || element.getBoundingClientRect().top + window.scrollY <= 100) {
continue
}
elementText = "";

@ -404,17 +404,21 @@ def changedetection_app(config=None, datastore_o=None):
global datastore
from changedetectionio import forms
limit_tag = request.args.get('tag', '').lower().strip()
active_tag_req = request.args.get('tag', '').lower().strip()
active_tag_uuid = active_tag = None
# Be sure limit_tag is a uuid
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if limit_tag == tag.get('title', '').lower().strip():
limit_tag = uuid
if active_tag_req:
for uuid, tag in datastore.data['settings']['application'].get('tags', {}).items():
if active_tag_req == tag.get('title', '').lower().strip() or active_tag_req == uuid:
active_tag = tag
active_tag_uuid = uuid
break
# Redirect for the old rss path which used the /?rss=true
if request.args.get('rss'):
return redirect(url_for('rss', tag=limit_tag))
return redirect(url_for('rss', tag=active_tag_uuid))
op = request.args.get('op')
if op:
@ -425,7 +429,7 @@ def changedetection_app(config=None, datastore_o=None):
datastore.data['watching'][uuid].toggle_mute()
datastore.needs_write = True
return redirect(url_for('index', tag = limit_tag))
return redirect(url_for('index', tag = active_tag_uuid))
# Sort by last_changed and add the uuid which is usually the key..
sorted_watches = []
@ -436,7 +440,7 @@ def changedetection_app(config=None, datastore_o=None):
if with_errors and not watch.get('last_error'):
continue
if limit_tag and not limit_tag in watch['tags']:
if active_tag_uuid and not active_tag_uuid in watch['tags']:
continue
if watch.get('last_error'):
errored_count += 1
@ -455,11 +459,12 @@ def changedetection_app(config=None, datastore_o=None):
total=total_count,
per_page=datastore.data['settings']['application'].get('pager_size', 50), css_framework="semantic")
sorted_tags = sorted(datastore.data['settings']['application'].get('tags').items(), key=lambda x: x[1]['title'])
output = render_template(
"watch-overview.html",
# Don't link to hosting when we're on the hosting environment
active_tag=limit_tag,
active_tag=active_tag,
active_tag_uuid=active_tag_uuid,
app_rss_token=datastore.data['settings']['application']['rss_access_token'],
datastore=datastore,
errored_count=errored_count,
@ -474,7 +479,7 @@ def changedetection_app(config=None, datastore_o=None):
sort_attribute=request.args.get('sort') if request.args.get('sort') else request.cookies.get('sort'),
sort_order=request.args.get('order') if request.args.get('order') else request.cookies.get('order'),
system_default_fetcher=datastore.data['settings']['application'].get('fetch_backend'),
tags=datastore.data['settings']['application'].get('tags'),
tags=sorted_tags,
watches=sorted_watches
)

@ -119,7 +119,7 @@ class perform_site_check(difference_detection_processor):
include_filters_from_tags = self.datastore.get_tag_overrides_for_watch(uuid=uuid, attr='include_filters')
# 1845 - remove duplicated filters in both group and watch include filter
include_filters_rule = list({*watch.get('include_filters', []), *include_filters_from_tags})
include_filters_rule = list(dict.fromkeys(watch.get('include_filters', []) + include_filters_from_tags))
subtractive_selectors = [*self.datastore.get_tag_overrides_for_watch(uuid=uuid, attr='subtractive_selectors'),
*watch.get("subtractive_selectors", []),

@ -160,6 +160,12 @@ $(document).ready(function () {
e.offsetX > item.left * y_scale && e.offsetX < item.left * y_scale + item.width * y_scale
) {
// Ignore really large ones, because we are scraping 'div' also from xpath_element_scraper but
// that div or whatever could be some wrapper and would generally make you select the whole page
if (item.width > 800 && item.height > 400) {
return
}
// There could be many elements here, record them all and then we'll find out which is the most 'useful'
// (input, textarea, button, A etc)
if (item.width < xpath_data['browser_width']) {

@ -1,6 +1,6 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.jinja' import render_simple_field, render_field, render_nolabel_field %}
{% from '_helpers.jinja' import render_simple_field, render_field, render_nolabel_field, sort_by_title %}
<script src="{{url_for('static_content', group='js', filename='jquery-3.6.0.min.js')}}"></script>
<script src="{{url_for('static_content', group='js', filename='watch-overview.js')}}" defer></script>
@ -13,7 +13,7 @@
<div id="watch-add-wrapper-zone">
{{ render_nolabel_field(form.url, placeholder="https://...", required=true) }}
{{ render_nolabel_field(form.tags, value=tags[active_tag].title if active_tag else '', placeholder="watch label / tag") }}
{{ render_nolabel_field(form.tags, value=active_tag.title if active_tag else '', placeholder="watch label / tag") }}
{{ render_nolabel_field(form.watch_submit_button, title="Watch this URL!" ) }}
{{ render_nolabel_field(form.edit_and_watch_submit_button, title="Edit first then Watch") }}
</div>
@ -46,11 +46,13 @@
{% if search_q %}<div id="search-result-info">Searching "<strong><i>{{search_q}}</i></strong>"</div>{% endif %}
<div>
<a href="{{url_for('index')}}" class="pure-button button-tag {{'active' if not active_tag }}">All</a>
{% for uuid, tag in tags.items() %}
{% if tag != "" %}
<a href="{{url_for('index', tag=uuid) }}" class="pure-button button-tag {{'active' if active_tag == uuid }}">{{ tag.title }}</a>
{% endif %}
{% endfor %}
<!-- tag list -->
{% for uuid, tag in tags %}
{% if tag != "" %}
<a href="{{url_for('index', tag=uuid) }}" class="pure-button button-tag {{'active' if active_tag_uuid == uuid }}">{{ tag.title }}</a>
{% endif %}
{% endfor %}
</div>
{% set sort_order = sort_order or 'asc' %}
@ -197,8 +199,8 @@
</li>
{% endif %}
<li>
<a href="{{ url_for('form_watch_checknow', tag=active_tag, with_errors=request.args.get('with_errors',0)) }}" class="pure-button button-tag ">Recheck
all {% if active_tag%} in "{{tags[active_tag].title}}"{%endif%}</a>
<a href="{{ url_for('form_watch_checknow', tag=active_tag_uuid, with_errors=request.args.get('with_errors',0)) }}" class="pure-button button-tag ">Recheck
all {% if active_tag_uuid %} in "{{active_tag.title}}"{%endif%}</a>
</li>
<li>
<a href="{{ url_for('rss', tag=active_tag , token=app_rss_token)}}"><img alt="RSS Feed" id="feed-icon" src="{{url_for('static_content', group='images', filename='Generic_Feed-icon.svg')}}" height="15"></a>

@ -0,0 +1,56 @@
import os
from flask import url_for
from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
def test_execute_custom_js(client, live_server):
live_server_setup(live_server)
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
test_url = test_url.replace('localhost', 'cdio')
res = client.post(
url_for("form_quick_watch_add"),
data={"url": test_url, "tags": '', 'edit_and_watch_submit_button': 'Edit > Watch'},
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
res = client.post(
url_for("edit_page", uuid="first", unpause_on_save=1),
data={
"url": test_url,
"tags": "",
'fetch_backend': "html_webdriver",
'webdriver_js_execute_code': 'document.querySelector("button[name=test-button]").click();',
'headers': "testheader: yes\buser-agent: MyCustomAgent",
},
follow_redirects=True
)
assert b"unpaused" in res.data
wait_for_all_checks(client)
uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
assert b"This text should be removed" not in res.data
# Check HTML conversion detected and workd
res = client.get(
url_for("preview_page", uuid=uuid),
follow_redirects=True
)
assert b"This text should be removed" not in res.data
assert b"I smell JavaScript because the button was pressed" in res.data
assert b"testheader: yes" in res.data
assert b"user-agent: mycustomagent" in res.data
client.get(
url_for("form_delete", uuid="all"),
follow_redirects=True
)

@ -321,3 +321,154 @@ def test_clone_tag_on_quickwatchform_add(client, live_server):
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_order_of_filters_tag_filter_and_watch_filter(client, live_server):
# Add a tag with some config, import a tag and it should roughly work
res = client.post(
url_for("tags.form_tag_add"),
data={"name": "test-tag-keep-order"},
follow_redirects=True
)
assert b"Tag added" in res.data
assert b"test-tag-keep-order" in res.data
tag_filters = [
'#only-this', # duplicated filters
'#only-this',
'#only-this',
'#only-this',
]
res = client.post(
url_for("tags.form_tag_edit_submit", uuid="first"),
data={"name": "test-tag-keep-order",
"include_filters": '\n'.join(tag_filters) },
follow_redirects=True
)
assert b"Updated" in res.data
tag_uuid = get_UUID_for_tag_name(client, name="test-tag-keep-order")
res = client.get(
url_for("tags.form_tag_edit", uuid="first")
)
assert b"#only-this" in res.data
d = """<html>
<body>
Some initial text<br>
<p id="only-this">And 1 this</p>
<br>
<p id="not-this">And 2 this</p>
<p id="">And 3 this</p><!--/html/body/p[3]/-->
<p id="">And 4 this</p><!--/html/body/p[4]/-->
<p id="">And 5 this</p><!--/html/body/p[5]/-->
<p id="">And 6 this</p><!--/html/body/p[6]/-->
<p id="">And 7 this</p><!--/html/body/p[7]/-->
<p id="">And 8 this</p><!--/html/body/p[8]/-->
<p id="">And 9 this</p><!--/html/body/p[9]/-->
<p id="">And 10 this</p><!--/html/body/p[10]/-->
<p id="">And 11 this</p><!--/html/body/p[11]/-->
<p id="">And 12 this</p><!--/html/body/p[12]/-->
<p id="">And 13 this</p><!--/html/body/p[13]/-->
<p id="">And 14 this</p><!--/html/body/p[14]/-->
<p id="not-this">And 15 this</p><!--/html/body/p[15]/-->
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(d)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
filters = [
'/html/body/p[3]',
'/html/body/p[4]',
'/html/body/p[5]',
'/html/body/p[6]',
'/html/body/p[7]',
'/html/body/p[8]',
'/html/body/p[9]',
'/html/body/p[10]',
'/html/body/p[11]',
'/html/body/p[12]',
'/html/body/p[13]', # duplicated tags
'/html/body/p[13]',
'/html/body/p[13]',
'/html/body/p[13]',
'/html/body/p[13]',
'/html/body/p[14]',
]
res = client.post(
url_for("edit_page", uuid="first"),
data={"include_filters": '\n'.join(filters),
"url": test_url,
"tags": "test-tag-keep-order",
"headers": "",
'fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
assert b"And 1 this" in res.data # test-tag-keep-order
a_tag_filter_check = b'And 1 this' #'#only-this' of tag_filters
# check there is no duplication of tag_filters
assert res.data.count(a_tag_filter_check) == 1, f"duplicated filters didn't removed {res.data.count(a_tag_filter_check)} of {a_tag_filter_check} in {res.data=}"
a_filter_check = b"And 13 this" # '/html/body/p[13]'
# check there is no duplication of filters
assert res.data.count(a_filter_check) == 1, f"duplicated filters didn't removed. {res.data.count(a_filter_check)} of {a_filter_check} in {res.data=}"
a_filter_check_not_include = b"And 2 this" # '/html/body/p[2]'
assert a_filter_check_not_include not in res.data
checklist = [
b"And 3 this",
b"And 4 this",
b"And 5 this",
b"And 6 this",
b"And 7 this",
b"And 8 this",
b"And 9 this",
b"And 10 this",
b"And 11 this",
b"And 12 this",
b"And 13 this",
b"And 14 this",
b"And 1 this", # result of filter from tag.
]
# check whether everything a user requested is there
for test in checklist:
assert test in res.data
# check whether everything a user requested is in order of filters.
n = 0
for test in checklist:
t_index = res.data[n:].find(test)
# if the text is not searched, return -1.
assert t_index >= 0, f"""failed because {test=} not in {res.data[n:]=}
#####################
Looks like some feature changed the order of result of filters.
#####################
the {test} appeared before. {test in res.data[:n]=}
{res.data[:n]=}
"""
n += t_index + len(test)
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

@ -29,7 +29,8 @@ def test_fetch_pdf(client, live_server):
follow_redirects=True
)
assert b'PDF-1.5' not in res.data
# PDF header should not be there (it was converted to text)
assert b'PDF' not in res.data[:10]
assert b'hello world' in res.data
# So we know if the file changes in other ways

@ -242,5 +242,28 @@ def live_server_setup(live_server):
resp.headers['Content-Type'] = 'application/pdf'
return resp
@live_server.app.route('/test-interactive-html-endpoint')
def test_interactive_html_endpoint():
header_text=""
for k,v in request.headers.items():
header_text += f"{k}: {v}<br>"
resp = make_response(f"""
<html>
<body>
Primitive JS check for <pre>changedetectionio/tests/visualselector/test_fetch_data.py</pre>
<p id="remove">This text should be removed</p>
<form onsubmit="event.preventDefault();">
<!-- obfuscated text so that we dont accidentally get a false positive due to conversion of the source :) --->
<button name="test-button" onclick="getElementById('remove').remove();getElementById('some-content').innerHTML = atob('SSBzbWVsbCBKYXZhU2NyaXB0IGJlY2F1c2UgdGhlIGJ1dHRvbiB3YXMgcHJlc3NlZCE=')">Click here</button>
<div id=some-content></div>
<pre>
{header_text.lower()}
</pre>
</body>
</html>""", 200)
resp.headers['Content-Type'] = 'text/html'
return resp
live_server.start()

@ -7,15 +7,19 @@ from ..util import live_server_setup, wait_for_all_checks, extract_UUID_from_cli
def test_setup(client, live_server):
live_server_setup(live_server)
# Add a site in paused mode, add an invalid filter, we should still have visual selector data ready
def test_visual_selector_content_ready(client, live_server):
import os
import json
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
# Add our URL to the import page, because the docker container (playwright/selenium) wont be able to connect to our usual test url
test_url = "https://changedetection.io/ci-test/test-runjs.html"
test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
test_url = test_url.replace('localhost', 'cdio')
res = client.post(
url_for("form_quick_watch_add"),
@ -23,28 +27,31 @@ def test_visual_selector_content_ready(client, live_server):
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
uuid = extract_UUID_from_client(client)
res = client.post(
url_for("edit_page", uuid="first", unpause_on_save=1),
url_for("edit_page", uuid=uuid, unpause_on_save=1),
data={
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_webdriver",
'webdriver_js_execute_code': 'document.querySelector("button[name=test-button]").click();'
"url": test_url,
"tags": "",
# For now, cookies doesnt work in headers because it must be a full cookiejar object
'headers': "testheader: yes\buser-agent: MyCustomAgent",
'fetch_backend': "html_webdriver",
},
follow_redirects=True
)
assert b"unpaused" in res.data
wait_for_all_checks(client)
uuid = extract_UUID_from_client(client)
# Check the JS execute code before extract worked
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
res = client.get(
url_for("preview_page", uuid="first"),
url_for("preview_page", uuid=uuid),
follow_redirects=True
)
assert b'I smell JavaScript' in res.data
assert b"testheader: yes" in res.data
assert b"user-agent: mycustomagent" in res.data
assert os.path.isfile(os.path.join('test-datastore', uuid, 'last-screenshot.png')), "last-screenshot.png should exist"
assert os.path.isfile(os.path.join('test-datastore', uuid, 'elements.json')), "xpath elements.json data should exist"
@ -74,30 +81,33 @@ def test_visual_selector_content_ready(client, live_server):
def test_basic_browserstep(client, live_server):
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
#live_server_setup(live_server)
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
# Add our URL to the import page, because the docker container (playwright/selenium) wont be able to connect to our usual test url
test_url = "https://changedetection.io/ci-test/test-runjs.html"
test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
test_url = test_url.replace('localhost', 'cdio')
res = client.post(
url_for("form_quick_watch_add"),
data={"url": test_url, "tags": '', 'edit_and_watch_submit_button': 'Edit > Watch'},
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
res = client.post(
url_for("edit_page", uuid="first", unpause_on_save=1),
data={
"url": test_url,
"tags": "",
"headers": "",
'fetch_backend': "html_webdriver",
'browser_steps-0-operation': 'Goto site',
'browser_steps-1-operation': 'Click element',
'browser_steps-1-selector': 'button[name=test-button]',
'browser_steps-1-optional_value': ''
"url": test_url,
"tags": "",
'fetch_backend': "html_webdriver",
'browser_steps-0-operation': 'Goto site',
'browser_steps-1-operation': 'Click element',
'browser_steps-1-selector': 'button[name=test-button]',
'browser_steps-1-optional_value': '',
# For now, cookies doesnt work in headers because it must be a full cookiejar object
'headers': "testheader: yes\buser-agent: MyCustomAgent",
},
follow_redirects=True
)
@ -105,6 +115,9 @@ def test_basic_browserstep(client, live_server):
wait_for_all_checks(client)
uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
assert b"This text should be removed" not in res.data
# Check HTML conversion detected and workd
res = client.get(
@ -114,13 +127,19 @@ def test_basic_browserstep(client, live_server):
assert b"This text should be removed" not in res.data
assert b"I smell JavaScript because the button was pressed" in res.data
assert b"testheader: yes" in res.data
assert b"user-agent: mycustomagent" in res.data
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
# now test for 404 errors
res = client.post(
url_for("edit_page", uuid=uuid, unpause_on_save=1),
data={
"url": "https://changedetection.io/404",
"url": four_o_four_url,
"tags": "",
"headers": "",
'fetch_backend': "html_webdriver",
'browser_steps-0-operation': 'Goto site',
'browser_steps-1-operation': 'Click element',

Loading…
Cancel
Save