more work on fixing headers

2197-browsersteps-headers
dgtlmoon 11 months ago
parent c9a9ed2da8
commit c95561edfb

@ -6,6 +6,8 @@ import re
from random import randint from random import randint
from loguru import logger from loguru import logger
from changedetectionio.content_fetchers.base import manage_user_agent
# Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end # Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end
# 0- off, 1- on # 0- off, 1- on
browser_step_ui_config = {'Choose one': '0 0', browser_step_ui_config = {'Choose one': '0 0',
@ -206,21 +208,18 @@ class browsersteps_live_ui(steppable_browser_interface):
keep_open = 1000 * 60 * 5 keep_open = 1000 * 60 * 5
now = time.time() now = time.time()
# Ask it what the user agent is, if its obviously ChromeHeadless, switch it to the default
from changedetectionio.content_fetchers import manage_user_agent
manage_user_agent(headers=self.headers)
# @todo handle multiple contexts, bind a unique id from the browser on each req? # @todo handle multiple contexts, bind a unique id from the browser on each req?
self.context = self.playwright_browser.new_context( self.context = self.playwright_browser.new_context(
accept_downloads=False, # Should never be needed accept_downloads=False, # Should never be needed
bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others
extra_http_headers=self.headers,
ignore_https_errors=True,
proxy=proxy, proxy=proxy,
service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'), # Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'),
user_agent=manage_user_agent(headers=self.headers) # Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers
user_agent=manage_user_agent(headers=self.headers),
) )
if self.headers:
self.context.set_extra_http_headers(self.headers)
self.page = self.context.new_page() self.page = self.context.new_page()

@ -29,7 +29,8 @@ def available_fetchers():
# rather than site-specific. # rather than site-specific.
use_playwright_as_chrome_fetcher = os.getenv('PLAYWRIGHT_DRIVER_URL', False) use_playwright_as_chrome_fetcher = os.getenv('PLAYWRIGHT_DRIVER_URL', False)
if use_playwright_as_chrome_fetcher: if use_playwright_as_chrome_fetcher:
if not strtobool(os.getenv('FAST_PUPPETEER_CHROME_FETCHER', 'False')): # @note - For now, browser steps always uses playwright
if not strtobool(os.getenv('FAST_PUPPETEER_CHROME_FETCHER', 'False')) or False:
logger.debug('Using Playwright library as fetcher') logger.debug('Using Playwright library as fetcher')
from .playwright import fetcher as html_webdriver from .playwright import fetcher as html_webdriver
else: else:

@ -5,6 +5,40 @@ from loguru import logger
from changedetectionio.content_fetchers import BrowserStepsStepException from changedetectionio.content_fetchers import BrowserStepsStepException
def manage_user_agent(headers, current_ua=''):
"""
Basic setting of user-agent
NOTE!!!!!! The service that does the actual Chrome fetching should handle any anti-robot techniques
THERE ARE MANY WAYS THAT IT CAN BE DETECTED AS A ROBOT!!
This does not take care of
- Scraping of 'navigator' (platform, productSub, vendor, oscpu etc etc) browser object (navigator.appVersion) etc
- TCP/IP fingerprint JA3 etc
- Graphic rendering fingerprinting
- Your IP being obviously in a pool of bad actors
- Too many requests
- Scraping of SCH-UA browser replies (thanks google!!)
- Scraping of ServiceWorker, new window calls etc
See https://filipvitas.medium.com/how-to-set-user-agent-header-with-puppeteer-js-and-not-fail-28c7a02165da
Puppeteer requests https://github.com/dgtlmoon/pyppeteerstealth
:param page:
:param headers:
:return:
"""
# Ask it what the user agent is, if its obviously ChromeHeadless, switch it to the default
ua_in_custom_headers = next((v for k, v in headers.items() if k.lower() == "user-agent"), None)
if ua_in_custom_headers:
return ua_in_custom_headers
if not ua_in_custom_headers and current_ua:
current_ua = current_ua.replace('HeadlessChrome', 'Chrome')
return current_ua
return None
class Fetcher(): class Fetcher():
browser_connection_is_custom = None browser_connection_is_custom = None
browser_connection_url = None browser_connection_url = None
@ -76,39 +110,6 @@ class Fetcher():
""" """
return {k.lower(): v for k, v in self.headers.items()} return {k.lower(): v for k, v in self.headers.items()}
def manage_user_agent(self, headers, current_ua=''):
"""
Basic setting of user-agent
NOTE!!!!!! The service that does the actual Chrome fetching should handle any anti-robot techniques
THERE ARE MANY WAYS THAT IT CAN BE DETECTED AS A ROBOT!!
This does not take care of
- Scraping of 'navigator' (platform, productSub, vendor, oscpu etc etc) browser object (navigator.appVersion) etc
- TCP/IP fingerprint JA3 etc
- Graphic rendering fingerprinting
- Your IP being obviously in a pool of bad actors
- Too many requests
- Scraping of SCH-UA browser replies (thanks google!!)
- Scraping of ServiceWorker, new window calls etc
See https://filipvitas.medium.com/how-to-set-user-agent-header-with-puppeteer-js-and-not-fail-28c7a02165da
Puppeteer requests https://github.com/dgtlmoon/pyppeteerstealth
:param page:
:param headers:
:return:
"""
# Ask it what the user agent is, if its obviously ChromeHeadless, switch it to the default
ua_in_custom_headers = next((v for k, v in headers.items() if k.lower() == "user-agent"), None)
if ua_in_custom_headers:
return ua_in_custom_headers
if not ua_in_custom_headers and current_ua:
current_ua = current_ua.replace('HeadlesssChrome', 'Chrome')
return current_ua
return None
def browser_steps_get_valid_steps(self): def browser_steps_get_valid_steps(self):
if self.browser_steps is not None and len(self.browser_steps): if self.browser_steps is not None and len(self.browser_steps):
valid_steps = filter( valid_steps = filter(

@ -4,7 +4,7 @@ from urllib.parse import urlparse
from loguru import logger from loguru import logger
from changedetectionio.content_fetchers.base import Fetcher from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable
class fetcher(Fetcher): class fetcher(Fetcher):
@ -105,16 +105,15 @@ class fetcher(Fetcher):
context = browser.new_context( context = browser.new_context(
accept_downloads=False, # Should never be needed accept_downloads=False, # Should never be needed
bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others bypass_csp=True, # This is needed to enable JavaScript execution on GitHub and others
extra_http_headers=request_headers,
ignore_https_errors=True,
proxy=self.proxy, proxy=self.proxy,
service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'), # Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers service_workers=os.getenv('PLAYWRIGHT_SERVICE_WORKERS', 'allow'), # Should be `allow` or `block` - sites like YouTube can transmit large amounts of data via Service Workers
user_agent = self.manage_user_agent(headers=request_headers), user_agent=manage_user_agent(headers=request_headers),
) )
self.page = context.new_page() self.page = context.new_page()
if len(request_headers):
context.set_extra_http_headers(request_headers)
# Listen for all console events and handle errors # Listen for all console events and handle errors
self.page.on("console", lambda msg: print(f"Playwright console: Watch URL: {url} {msg.type}: {msg.text} {msg.args}")) self.page.on("console", lambda msg: print(f"Playwright console: Watch URL: {url} {msg.type}: {msg.text} {msg.args}"))

@ -6,7 +6,7 @@ from urllib.parse import urlparse
from loguru import logger from loguru import logger
from changedetectionio.content_fetchers.base import Fetcher from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, BrowserConnectError from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, BrowserConnectError
@ -101,7 +101,7 @@ class fetcher(Fetcher):
else: else:
self.page = await browser.newPage() self.page = await browser.newPage()
await self.page.setUserAgent(self.manage_user_agent(headers=request_headers, current_ua=self.page.evaluate('navigator.userAgent'))) await self.page.setUserAgent(manage_user_agent(headers=request_headers, current_ua=await self.page.evaluate('navigator.userAgent')))
await self.page.setBypassCSP(True) await self.page.setBypassCSP(True)
if request_headers: if request_headers:

@ -29,7 +29,8 @@ def test_fetch_pdf(client, live_server):
follow_redirects=True follow_redirects=True
) )
assert b'PDF-1.5' not in res.data # PDF header should not be there (it was converted to text)
assert b'PDF' not in res.data[:10]
assert b'hello world' in res.data assert b'hello world' in res.data
# So we know if the file changes in other ways # So we know if the file changes in other ways

@ -242,5 +242,29 @@ def live_server_setup(live_server):
resp.headers['Content-Type'] = 'application/pdf' resp.headers['Content-Type'] = 'application/pdf'
return resp return resp
@live_server.app.route('/test-interactive-html-endpoint')
def test_interactive_html_endpoint():
import json
header_text=""
for k,v in request.headers.items():
header_text += f"{k}: {v}<br>"
resp = make_response(f"""
<html>
<body>
Primitive JS check for <pre>changedetectionio/tests/visualselector/test_fetch_data.py</pre>
<p id="remove">This text should be removed</p>
<form onsubmit="event.preventDefault();">
<!-- obfuscated text so that we dont accidentally get a false positive due to conversion of the source :) --->
<button name="test-button" onclick="getElementById('remove').remove();getElementById('some-content').innerHTML = atob('SSBzbWVsbCBKYXZhU2NyaXB0IGJlY2F1c2UgdGhlIGJ1dHRvbiB3YXMgcHJlc3NlZCE=')">Click here</button>
<div id=some-content></div>
<pre>
{header_text.lower()}
</pre>
</body>
</html>""", 200)
resp.headers['Content-Type'] = 'text/html'
return resp
live_server.start() live_server.start()

@ -15,7 +15,9 @@ def test_visual_selector_content_ready(client, live_server):
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test" assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
# Add our URL to the import page, because the docker container (playwright/selenium) wont be able to connect to our usual test url # Add our URL to the import page, because the docker container (playwright/selenium) wont be able to connect to our usual test url
test_url = "https://changedetection.io/ci-test/test-runjs.html" test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
test_url = test_url.replace('localhost', 'cdio')
res = client.post( res = client.post(
url_for("form_quick_watch_add"), url_for("form_quick_watch_add"),
@ -37,7 +39,9 @@ def test_visual_selector_content_ready(client, live_server):
) )
assert b"unpaused" in res.data assert b"unpaused" in res.data
wait_for_all_checks(client) wait_for_all_checks(client)
uuid = extract_UUID_from_client(client) uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
# Check the JS execute code before extract worked # Check the JS execute code before extract worked
res = client.get( res = client.get(
@ -74,11 +78,13 @@ def test_visual_selector_content_ready(client, live_server):
def test_basic_browserstep(client, live_server): def test_basic_browserstep(client, live_server):
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
#live_server_setup(live_server) #live_server_setup(live_server)
# Add our URL to the import page, because the docker container (playwright/selenium) wont be able to connect to our usual test url assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
test_url = "https://changedetection.io/ci-test/test-runjs.html"
test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
test_url = test_url.replace('localhost', 'cdio')
res = client.post( res = client.post(
url_for("form_quick_watch_add"), url_for("form_quick_watch_add"),
@ -97,7 +103,8 @@ def test_basic_browserstep(client, live_server):
'browser_steps-1-operation': 'Click element', 'browser_steps-1-operation': 'Click element',
'browser_steps-1-selector': 'button[name=test-button]', 'browser_steps-1-selector': 'button[name=test-button]',
'browser_steps-1-optional_value': '', 'browser_steps-1-optional_value': '',
'headers': "cOoKiE: notice-apa=1; test-value=1; " # For now, cookies doesnt work in headers because it must be a full cookiejar object
'headers': "testheader: yes\buser-agent: MyCustomAgent",
}, },
follow_redirects=True follow_redirects=True
) )
@ -105,6 +112,9 @@ def test_basic_browserstep(client, live_server):
wait_for_all_checks(client) wait_for_all_checks(client)
uuid = extract_UUID_from_client(client) uuid = extract_UUID_from_client(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
assert b"This text should be removed" not in res.data
# Check HTML conversion detected and workd # Check HTML conversion detected and workd
res = client.get( res = client.get(
@ -113,16 +123,20 @@ def test_basic_browserstep(client, live_server):
) )
assert b"This text should be removed" not in res.data assert b"This text should be removed" not in res.data
assert b"I smell JavaScript because the button was pressed" in res.data assert b"I smell JavaScript because the button was pressed" in res.data
# The JS on the page will set this if the cookie (and thus headers) was handled
assert b"test-value in headers found" in res.data assert b"testheader: yes" in res.data
assert b"user-agent: mycustomagent" in res.data
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
# now test for 404 errors # now test for 404 errors
res = client.post( res = client.post(
url_for("edit_page", uuid=uuid, unpause_on_save=1), url_for("edit_page", uuid=uuid, unpause_on_save=1),
data={ data={
"url": "https://changedetection.io/404", "url": four_o_four_url,
"tags": "", "tags": "",
"headers": "",
'fetch_backend': "html_webdriver", 'fetch_backend': "html_webdriver",
'browser_steps-0-operation': 'Goto site', 'browser_steps-0-operation': 'Goto site',
'browser_steps-1-operation': 'Click element', 'browser_steps-1-operation': 'Click element',

Loading…
Cancel
Save