#!/usr/bin/python3
from flask import make_response, request
from flask import url_for
import logging
import time
def set_original_response():
test_return_data = """
head title
Some initial text
Which is across multiple lines
So let's see what happens.
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_modified_response():
test_return_data = """
modified head title
Some initial text
which has this one new line
So let's see what happens.
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_longer_modified_response():
test_return_data = """
modified head title
Some initial text
which has this one new line
So let's see what happens.
So let's see what happens.
So let's see what happens.
So let's see what happens.
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
def set_more_modified_response():
test_return_data = """
modified head title
Some initial text
which has this one new line
So let's see what happens.
Ohh yeah awesome
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
# kinda funky, but works for now
def extract_api_key_from_UI(client):
import re
res = client.get(
url_for("settings_page"),
)
# {{api_key}}
m = re.search('(.+?)', str(res.data))
api_key = m.group(1)
return api_key.strip()
# kinda funky, but works for now
def get_UUID_for_tag_name(client, name):
app_config = client.application.config.get('DATASTORE').data
for uuid, tag in app_config['settings']['application'].get('tags', {}).items():
if name == tag.get('title', '').lower().strip():
return uuid
return None
# kinda funky, but works for now
def extract_rss_token_from_UI(client):
import re
res = client.get(
url_for("index"),
)
m = re.search('token=(.+?)"', str(res.data))
token_key = m.group(1)
return token_key.strip()
# kinda funky, but works for now
def extract_UUID_from_client(client):
import re
res = client.get(
url_for("index"),
)
# {{api_key}}
m = re.search('edit/(.+?)[#"]', str(res.data))
uuid = m.group(1)
return uuid.strip()
def wait_for_all_checks(client):
now = time.time()
while time.time() - now <= 30:
time.sleep(0.1)
p = client.application.view_functions['get_queue_size']()
if not p:
break
logging.getLogger().info(f"Waiting for queue to be empty, queue size {p} - {time.time() - now}")
# Empty queue still means that one could be processing, give time for the processing to complete
time.sleep(0.2)
def live_server_setup(live_server):
@live_server.app.route('/test-random-content-endpoint')
def test_random_content_endpoint():
import secrets
return "Random content - {}\n".format(secrets.token_hex(64))
@live_server.app.route('/test-endpoint')
def test_endpoint():
ctype = request.args.get('content_type')
status_code = request.args.get('status_code')
content = request.args.get('content') or None
# Used to just try to break the header detection
uppercase_headers = request.args.get('uppercase_headers')
try:
if content is not None:
resp = make_response(content, status_code)
if uppercase_headers:
ctype=ctype.upper()
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
else:
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/endpoint-content.txt", "r") as f:
resp = make_response(f.read(), status_code)
if uppercase_headers:
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
else:
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
except FileNotFoundError:
return make_response('', status_code)
# Just return the headers in the request
@live_server.app.route('/test-headers')
def test_headers():
output = []
for header in request.headers:
output.append("{}:{}".format(str(header[0]), str(header[1])))
content = "\n".join(output)
resp = make_response(content, 200)
resp.headers['server'] = 'custom'
return resp
# Just return the body in the request
@live_server.app.route('/test-body', methods=['POST', 'GET'])
def test_body():
print ("TEST-BODY GOT", request.data, "returning")
return request.data
# Just return the verb in the request
@live_server.app.route('/test-method', methods=['POST', 'GET', 'PATCH'])
def test_method():
return request.method
# Where we POST to as a notification
@live_server.app.route('/test_notification_endpoint', methods=['POST', 'GET'])
def test_notification_endpoint():
with open("test-datastore/notification.txt", "wb") as f:
# Debug method, dump all POST to file also, used to prove #65
data = request.stream.read()
if data != None:
f.write(data)
with open("test-datastore/notification-url.txt", "w") as f:
f.write(request.url)
with open("test-datastore/notification-headers.txt", "w") as f:
f.write(str(request.headers))
if request.content_type:
with open("test-datastore/notification-content-type.txt", "w") as f:
f.write(request.content_type)
print("\n>> Test notification endpoint was hit.\n", data)
return "Text was set"
# Just return the verb in the request
@live_server.app.route('/test-basicauth', methods=['GET'])
def test_basicauth_method():
auth = request.authorization
ret = " ".join([auth.username, auth.password, auth.type])
return ret
# Just return some GET var
@live_server.app.route('/test-return-query', methods=['GET'])
def test_return_query():
return request.query_string
@live_server.app.route('/endpoint-test.pdf')
def test_pdf_endpoint():
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/endpoint-test.pdf", "rb") as f:
resp = make_response(f.read(), 200)
resp.headers['Content-Type'] = 'application/pdf'
return resp
@live_server.app.route('/test-interactive-html-endpoint')
def test_interactive_html_endpoint():
header_text=""
for k,v in request.headers.items():
header_text += f"{k}: {v}
"
resp = make_response(f"""
Primitive JS check for changedetectionio/tests/visualselector/test_fetch_data.py
This text should be removed