Detect byte-encoding when the server mishandles the content-type header reply (#472)

pull/476/head
dgtlmoon 3 years ago committed by GitHub
parent 2a03f3f57e
commit 3e8a15456a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1 @@
test-datastore

@ -1,10 +1,12 @@
import os
import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
import chardet
import os
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.proxy import Proxy as SeleniumProxy from selenium.webdriver.common.proxy import Proxy as SeleniumProxy
from selenium.common.exceptions import WebDriverException from selenium.common.exceptions import WebDriverException
import requests
import time
import urllib3.exceptions import urllib3.exceptions
@ -20,7 +22,7 @@ class EmptyReply(Exception):
class Fetcher(): class Fetcher():
error = None error = None
status_code = None status_code = None
content = None # Should always be bytes. content = None
headers = None headers = None
fetcher_description ="No description" fetcher_description ="No description"
@ -146,7 +148,6 @@ class html_requests(Fetcher):
fetcher_description = "Basic fast Plaintext/HTTP Client" fetcher_description = "Basic fast Plaintext/HTTP Client"
def run(self, url, timeout, request_headers, request_body, request_method): def run(self, url, timeout, request_headers, request_body, request_method):
import requests
r = requests.request(method=request_method, r = requests.request(method=request_method,
data=request_body, data=request_body,
@ -155,16 +156,21 @@ class html_requests(Fetcher):
timeout=timeout, timeout=timeout,
verify=False) verify=False)
# https://stackoverflow.com/questions/44203397/python-requests-get-returns-improperly-decoded-text-instead-of-utf-8 # If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks.
# Return bytes here # For example - some sites don't tell us it's utf-8, but return utf-8 content
html = r.text # This seems to not occur when using webdriver/selenium, it seems to detect the text encoding more reliably.
# https://github.com/psf/requests/issues/1604 good info about requests encoding detection
if not r.headers.get('content-type') or not 'charset=' in r.headers.get('content-type'):
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
# @todo test this # @todo test this
# @todo maybe you really want to test zero-byte return pages? # @todo maybe you really want to test zero-byte return pages?
if not r or not html or not len(html): if not r or not r.content or not len(r.content):
raise EmptyReply(url=url, status_code=r.status_code) raise EmptyReply(url=url, status_code=r.status_code)
self.status_code = r.status_code self.status_code = r.status_code
self.content = html self.content = r.text
self.headers = r.headers self.headers = r.headers

@ -0,0 +1,87 @@
#!/usr/bin/python3
# coding=utf-8
import time
from flask import url_for
from .util import live_server_setup
import pytest
def test_setup(live_server):
live_server_setup(live_server)
def set_html_response():
test_return_data = """
<html><body><span class="nav_second_img_text">
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;铸大国重器挺制造脊梁致力能源未来赋能美好生活
</span>
</body></html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
# In the case the server does not issue a charset= or doesnt have content_type header set
def test_check_encoding_detection(client, live_server):
set_html_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(2)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Should see the proper string
assert "铸大国重".encode('utf-8') in res.data
# Should not see the failed encoding
assert b'\xc2\xa7' not in res.data
# In the case the server does not issue a charset= or doesnt have content_type header set
def test_check_encoding_detection_missing_content_type_header(client, live_server):
set_html_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(2)
res = client.get(
url_for("preview_page", uuid="first"),
follow_redirects=True
)
# Should see the proper string
assert "铸大国重".encode('utf-8') in res.data
# Should not see the failed encoding
assert b'\xc2\xa7' not in res.data
Loading…
Cancel
Save