From 3e8a15456a117e63f0e463434a83585585f20c9d Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Thu, 17 Mar 2022 10:28:02 +0100 Subject: [PATCH] Detect byte-encoding when the server mishandles the content-type header reply (#472) --- changedetectionio/.gitignore | 1 + changedetectionio/content_fetcher.py | 24 ++++--- changedetectionio/tests/test_encoding.py | 87 ++++++++++++++++++++++++ 3 files changed, 103 insertions(+), 9 deletions(-) create mode 100644 changedetectionio/.gitignore create mode 100644 changedetectionio/tests/test_encoding.py diff --git a/changedetectionio/.gitignore b/changedetectionio/.gitignore new file mode 100644 index 00000000..1d463784 --- /dev/null +++ b/changedetectionio/.gitignore @@ -0,0 +1 @@ +test-datastore diff --git a/changedetectionio/content_fetcher.py b/changedetectionio/content_fetcher.py index 890ff65d..df374101 100644 --- a/changedetectionio/content_fetcher.py +++ b/changedetectionio/content_fetcher.py @@ -1,10 +1,12 @@ -import os -import time from abc import ABC, abstractmethod +import chardet +import os from selenium import webdriver from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.proxy import Proxy as SeleniumProxy from selenium.common.exceptions import WebDriverException +import requests +import time import urllib3.exceptions @@ -20,7 +22,7 @@ class EmptyReply(Exception): class Fetcher(): error = None status_code = None - content = None # Should always be bytes. + content = None headers = None fetcher_description ="No description" @@ -146,7 +148,6 @@ class html_requests(Fetcher): fetcher_description = "Basic fast Plaintext/HTTP Client" def run(self, url, timeout, request_headers, request_body, request_method): - import requests r = requests.request(method=request_method, data=request_body, @@ -155,16 +156,21 @@ class html_requests(Fetcher): timeout=timeout, verify=False) - # https://stackoverflow.com/questions/44203397/python-requests-get-returns-improperly-decoded-text-instead-of-utf-8 - # Return bytes here - html = r.text + # If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks. + # For example - some sites don't tell us it's utf-8, but return utf-8 content + # This seems to not occur when using webdriver/selenium, it seems to detect the text encoding more reliably. + # https://github.com/psf/requests/issues/1604 good info about requests encoding detection + if not r.headers.get('content-type') or not 'charset=' in r.headers.get('content-type'): + encoding = chardet.detect(r.content)['encoding'] + if encoding: + r.encoding = encoding # @todo test this # @todo maybe you really want to test zero-byte return pages? - if not r or not html or not len(html): + if not r or not r.content or not len(r.content): raise EmptyReply(url=url, status_code=r.status_code) self.status_code = r.status_code - self.content = html + self.content = r.text self.headers = r.headers diff --git a/changedetectionio/tests/test_encoding.py b/changedetectionio/tests/test_encoding.py new file mode 100644 index 00000000..58150135 --- /dev/null +++ b/changedetectionio/tests/test_encoding.py @@ -0,0 +1,87 @@ +#!/usr/bin/python3 +# coding=utf-8 + +import time +from flask import url_for +from .util import live_server_setup +import pytest + + +def test_setup(live_server): + live_server_setup(live_server) + + +def set_html_response(): + test_return_data = """ + +        铸大国重器,挺制造脊梁,致力能源未来,赋能美好生活。 + + + """ + with open("test-datastore/endpoint-content.txt", "w") as f: + f.write(test_return_data) + return None + + +# In the case the server does not issue a charset= or doesnt have content_type header set +def test_check_encoding_detection(client, live_server): + set_html_response() + + # Give the endpoint time to spin up + time.sleep(1) + + # Add our URL to the import page + test_url = url_for('test_endpoint', content_type="text/html", _external=True) + client.post( + url_for("import_page"), + data={"urls": test_url}, + follow_redirects=True + ) + + # Trigger a check + client.get(url_for("api_watch_checknow"), follow_redirects=True) + + # Give the thread time to pick it up + time.sleep(2) + + res = client.get( + url_for("preview_page", uuid="first"), + follow_redirects=True + ) + + # Should see the proper string + assert "铸大国重".encode('utf-8') in res.data + # Should not see the failed encoding + assert b'\xc2\xa7' not in res.data + + +# In the case the server does not issue a charset= or doesnt have content_type header set +def test_check_encoding_detection_missing_content_type_header(client, live_server): + set_html_response() + + # Give the endpoint time to spin up + time.sleep(1) + + # Add our URL to the import page + test_url = url_for('test_endpoint', _external=True) + client.post( + url_for("import_page"), + data={"urls": test_url}, + follow_redirects=True + ) + + # Trigger a check + client.get(url_for("api_watch_checknow"), follow_redirects=True) + + # Give the thread time to pick it up + time.sleep(2) + + res = client.get( + url_for("preview_page", uuid="first"), + follow_redirects=True + ) + + # Should see the proper string + assert "铸大国重".encode('utf-8') in res.data + # Should not see the failed encoding + assert b'\xc2\xa7' not in res.data