Issue #14 - Tweaks to edit, create ignore text, tests for ignore text, integrate ignore text

pull/15/head
Leigh Morresi 4 years ago
parent 0855017dca
commit 468184bc3a

@ -151,13 +151,16 @@ def changedetection_app(config=None, datastore_o=None):
return render_template("scrub.html") return render_template("scrub.html")
@app.route("/edit", methods=['GET', 'POST']) @app.route("/edit/<string:uuid>", methods=['GET', 'POST'])
def edit_page(): def edit_page(uuid):
global messages global messages
import validators import validators
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if request.method == 'POST': if request.method == 'POST':
uuid = request.args.get('uuid')
url = request.form.get('url').strip() url = request.form.get('url').strip()
tag = request.form.get('tag').strip() tag = request.form.get('tag').strip()
@ -172,10 +175,27 @@ def changedetection_app(config=None, datastore_o=None):
if len(parts) == 2: if len(parts) == 2:
extra_headers.update({parts[0].strip(): parts[1].strip()}) extra_headers.update({parts[0].strip(): parts[1].strip()})
update_obj = {'url': url,
'tag': tag,
'headers': extra_headers
}
# Ignore text
form_ignore_text = request.form.get('ignore-text').strip()
ignore_text = []
if form_ignore_text:
for text in form_ignore_text.split("\n"):
text = text.strip()
if len(text):
ignore_text.append(text)
# Reset the previous_md5 so we process a new snapshot including stripping ignore text.
update_obj['previous_md5'] = ""
update_obj['ignore_text'] = ignore_text
validators.url(url) # @todo switch to prop/attr/observer validators.url(url) # @todo switch to prop/attr/observer
datastore.data['watching'][uuid].update({'url': url, datastore.data['watching'][uuid].update(update_obj)
'tag': tag,
'headers': extra_headers})
datastore.needs_write = True datastore.needs_write = True
messages.append({'class': 'ok', 'message': 'Updated watch.'}) messages.append({'class': 'ok', 'message': 'Updated watch.'})
@ -183,8 +203,6 @@ def changedetection_app(config=None, datastore_o=None):
return redirect(url_for('index')) return redirect(url_for('index'))
else: else:
uuid = request.args.get('uuid')
output = render_template("edit.html", uuid=uuid, watch=datastore.data['watching'][uuid], messages=messages) output = render_template("edit.html", uuid=uuid, watch=datastore.data['watching'][uuid], messages=messages)
return output return output

@ -11,6 +11,15 @@ class perform_site_check():
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.datastore = datastore self.datastore = datastore
def strip_ignore_text(self, content, list_ignore_text):
output=[]
for line in content.splitlines():
if not any(skip_text in line for skip_text in list_ignore_text):
output.append(line)
return "\n".join(output)
def run(self, uuid): def run(self, uuid):
timestamp = int(time.time()) # used for storage etc too timestamp = int(time.time()) # used for storage etc too
stripped_text_from_html = False stripped_text_from_html = False
@ -76,7 +85,16 @@ class perform_site_check():
if not len(r.text): if not len(r.text):
update_obj["last_error"] = "Empty reply" update_obj["last_error"] = "Empty reply"
fetched_md5 = hashlib.md5(stripped_text_from_html.encode('utf-8')).hexdigest() content = stripped_text_from_html.encode('utf-8')
# If there's text to skip
# @todo we could abstract out the get_text() to handle this cleaner
if len(self.datastore.data['watching'][uuid]['ignore_text']):
content = self.strip_ignore_text(content, self.datastore.data['watching'][uuid]['ignore_text'])
fetched_md5 = hashlib.md5(content).hexdigest()
# could be None or False depending on JSON type # could be None or False depending on JSON type
if self.datastore.data['watching'][uuid]['previous_md5'] != fetched_md5: if self.datastore.data['watching'][uuid]['previous_md5'] != fetched_md5:

@ -1,2 +1,4 @@
[pytest] [pytest]
addopts = --no-start-live-server --live-server-port=5005 addopts = --no-start-live-server --live-server-port=5005
live_server_scope = function

@ -53,7 +53,8 @@ class ChangeDetectionStore:
'previous_md5': "", 'previous_md5': "",
'uuid': str(uuid_builder.uuid4()), 'uuid': str(uuid_builder.uuid4()),
'headers': {}, # Extra headers to send 'headers': {}, # Extra headers to send
'history': {} # Dict of timestamp and output stripped filename 'history': {}, # Dict of timestamp and output stripped filename
'ignore_text': [] # List of text to ignore when calculating the comparison checksum
} }
if path.isfile('/source.txt'): if path.isfile('/source.txt'):

@ -18,10 +18,26 @@
<span class="pure-form-message-inline">Grouping tags, can be a comma separated list.</span> <span class="pure-form-message-inline">Grouping tags, can be a comma separated list.</span>
</div> </div>
<!-- @todo: move to tabs --->
<fieldset class="pure-group">
<label for="ignore-text">Ignore text</label>
<textarea id="ignore-text" name="ignore-text" class="pure-input-1-2" placeholder=""
style="width: 100%;
font-family:monospace;
white-space: pre;
overflow-wrap: normal;
overflow-x: scroll;" rows="5">{% for value in watch.ignore_text %}{{ value }}
{% endfor %}</textarea>
<span class="pure-form-message-inline">Each line will be processed separately as an ignore rule.</span>
</fieldset>
<!-- @todo: move to tabs --->
<fieldset class="pure-group"> <fieldset class="pure-group">
<label for="headers">Extra request headers</label> <label for="headers">Extra request headers</label>
<textarea id=headers name="headers" class="pure-input-1-2" placeholder="Example <textarea id="headers" name="headers" class="pure-input-1-2" placeholder="Example
Cookie: foobar Cookie: foobar
User-Agent: wonderbra 1.0" User-Agent: wonderbra 1.0"
style="width: 100%; style="width: 100%;
@ -33,6 +49,8 @@ User-Agent: wonderbra 1.0"
<br/> <br/>
</fieldset> </fieldset>
<div class="pure-control-group"> <div class="pure-control-group">
<button type="submit" class="pure-button pure-button-primary">Save</button> <button type="submit" class="pure-button pure-button-primary">Save</button>
</div> </div>

@ -43,7 +43,8 @@
<tr id="{{ watch.uuid }}" <tr id="{{ watch.uuid }}"
class="{{ loop.cycle('pure-table-odd', 'pure-table-even') }} class="{{ loop.cycle('pure-table-odd', 'pure-table-even') }}
{% if watch.last_error is defined and watch.last_error != False %}error{% endif %} {% if watch.last_error is defined and watch.last_error != False %}error{% endif %}
{% if watch.newest_history_key| int > watch.last_viewed| int %}unviewed{% endif %}"> {% if watch.newest_history_key| int > watch.last_viewed| int %}unviewed{% endif %}
">
<td>{{ loop.index }}</td> <td>{{ loop.index }}</td>
<td class="title-col">{{watch.title if watch.title is not none else watch.url}} <td class="title-col">{{watch.title if watch.title is not none else watch.url}}
<a class="external" target=_blank href="{{ watch.url }}"></a> <a class="external" target=_blank href="{{ watch.url }}"></a>
@ -64,7 +65,7 @@
<td> <td>
<a href="/api/checknow?uuid={{ watch.uuid}}{% if request.args.get('tag') %}&tag={{request.args.get('tag')}}{% endif %}" <a href="/api/checknow?uuid={{ watch.uuid}}{% if request.args.get('tag') %}&tag={{request.args.get('tag')}}{% endif %}"
class="pure-button button-small pure-button-primary">Recheck</a> class="pure-button button-small pure-button-primary">Recheck</a>
<a href="/edit?uuid={{ watch.uuid}}" class="pure-button button-small pure-button-primary">Edit</a> <a href="/edit/{{ watch.uuid}}" class="pure-button button-small pure-button-primary">Edit</a>
{% if watch.history|length >= 2 %} {% if watch.history|length >= 2 %}
<a href="/diff/{{ watch.uuid}}" class="pure-button button-small pure-button-primary">Diff</a> <a href="/diff/{{ watch.uuid}}" class="pure-button button-small pure-button-primary">Diff</a>
{% endif %} {% endif %}

@ -13,7 +13,6 @@ import os
global app global app
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def app(request): def app(request):
"""Create application for the tests.""" """Create application for the tests."""
@ -37,7 +36,12 @@ def app(request):
def teardown(): def teardown():
datastore.stop_thread = True datastore.stop_thread = True
app.config['STOP_THREADS'] = True app.config['STOP_THREADS'] = True
try:
os.unlink("{}/url-watches.json".format(datastore_path))
except FileNotFoundError:
# This is fine in the case of a failure.
pass
request.addfinalizer(teardown)
request.addfinalizer(teardown)
return app return app

@ -0,0 +1,135 @@
#!/usr/bin/python3
import time
from flask import url_for
from urllib.request import urlopen
# Unit test of the stripper
def test_strip_text_func():
from backend import fetch_site_status
test_content = """
Some content
is listed here
but sometimes we want to remove the lines.
but not always."""
original_length = len(test_content.splitlines())
fetcher = fetch_site_status.perform_site_check(datastore=False)
ignore_lines = ["sometimes"]
stripped_content = fetcher.strip_ignore_text(test_content, ignore_lines)
# Should be one line shorter
assert len(stripped_content.splitlines()) == original_length - 1
assert "sometimes" not in stripped_content
assert "Some content" in stripped_content
def set_original_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
def set_modified_response():
test_return_data = """<html>
<body>
Some initial text</br>
<p>Which is across multiple lines</p>
<P>ZZZZZ</P>
</br>
So let's see what happens. </br>
</body>
</html>
"""
with open("test-datastore/output.txt", "w") as f:
f.write(test_return_data)
def test_check_ignore_text_functionality(client, live_server):
sleep_time_for_fetch_thread = 5
ignore_text = "XXXXX\nYYYYY\nZZZZZ"
set_original_response()
@live_server.app.route('/test-ignore-endpoint')
def test_ignore_endpoint():
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open("test-datastore/output.txt", "r") as f:
return f.read()
live_server.start()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_ignore_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("edit_page", uuid="first"),
data={"ignore-text": ignore_text, "url": test_url, "tag": "", "headers": ""},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Check it saved
res = client.get(
url_for("edit_page", uuid="first"),
)
assert bytes(ignore_text.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-ignore-endpoint' in res.data
set_modified_response()
# Trigger a check
client.get(url_for("api_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'unviewed' class)
res = client.get(url_for("index"))
assert b'unviewed' not in res.data
assert b'/test-ignore-endpoint' in res.data
live_server.stop()
Loading…
Cancel
Save