Merge branch 'master' into test-cleanups

test-cleanups
dgtlmoon 2 years ago committed by GitHub
commit bc79c60873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -38,7 +38,7 @@ from flask_paginate import Pagination, get_page_parameter
from changedetectionio import html_tools from changedetectionio import html_tools
from changedetectionio.api import api_v1 from changedetectionio.api import api_v1
__version__ = '0.43' __version__ = '0.43.1'
datastore = None datastore = None

@ -219,13 +219,15 @@ class CreateWatch(Resource):
extras = copy.deepcopy(json_data) extras = copy.deepcopy(json_data)
# Because we renamed 'tag' to 'tags' but dont want to change the API (can do this in v2 of the API) # Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API)
tags = None
if extras.get('tag'): if extras.get('tag'):
extras['tags'] = extras.get('tag') tags = extras.get('tag')
del extras['tag']
del extras['url'] del extras['url']
new_uuid = self.datastore.add_watch(url=url, extras=extras) new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
if new_uuid: if new_uuid:
self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True})) self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
return {'uuid': new_uuid}, 201 return {'uuid': new_uuid}, 201

@ -76,6 +76,16 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(f"Tag unlinked removed from {unlinked} watches") flash(f"Tag unlinked removed from {unlinked} watches")
return redirect(url_for('tags.tags_overview_page')) return redirect(url_for('tags.tags_overview_page'))
@tags_blueprint.route("/delete_all", methods=['GET'])
@login_optionally_required
def delete_all():
for watch_uuid, watch in datastore.data['watching'].items():
watch['tags'] = []
datastore.data['settings']['application']['tags'] = {}
flash(f"All tags deleted")
return redirect(url_for('tags.tags_overview_page'))
@tags_blueprint.route("/edit/<string:uuid>", methods=['GET']) @tags_blueprint.route("/edit/<string:uuid>", methods=['GET'])
@login_optionally_required @login_optionally_required
def form_tag_edit(uuid): def form_tag_edit(uuid):

@ -85,6 +85,7 @@ class import_distill_io_json(Importer):
now = time.time() now = time.time()
self.new_uuids=[] self.new_uuids=[]
# @todo Use JSONSchema like in the API to validate here.
try: try:
data = json.loads(data.strip()) data = json.loads(data.strip())
@ -120,11 +121,8 @@ class import_distill_io_json(Importer):
except IndexError: except IndexError:
pass pass
# Does this need to be here anymore?
if d.get('tags', False):
extras['tags'] = ", ".join(d['tags'])
new_uuid = datastore.add_watch(url=d['uri'].strip(), new_uuid = datastore.add_watch(url=d['uri'].strip(),
tag=",".join(d.get('tags', [])),
extras=extras, extras=extras,
write_to_disk_now=False) write_to_disk_now=False)

@ -93,6 +93,12 @@ def process_notification(n_object, datastore):
valid_notification_formats[default_notification_format], valid_notification_formats[default_notification_format],
) )
# If we arrived with 'System default' then look it up
if n_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
# Initially text or whatever
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format])
# https://github.com/caronc/apprise/wiki/Development_LogCapture # https://github.com/caronc/apprise/wiki/Development_LogCapture
# Anything higher than or equal to WARNING (which covers things like Connection errors) # Anything higher than or equal to WARNING (which covers things like Connection errors)
# raise it as an exception # raise it as an exception

@ -205,10 +205,9 @@ class ChangeDetectionStore:
# Clone a watch by UUID # Clone a watch by UUID
def clone(self, uuid): def clone(self, uuid):
url = self.data['watching'][uuid]['url'] url = self.data['watching'][uuid].get('url')
tag = self.data['watching'][uuid].get('tags',[])
extras = self.data['watching'][uuid] extras = self.data['watching'][uuid]
new_uuid = self.add_watch(url=url, tag_uuids=tag, extras=extras) new_uuid = self.add_watch(url=url, extras=extras)
return new_uuid return new_uuid
def url_exists(self, url): def url_exists(self, url):
@ -248,12 +247,9 @@ class ChangeDetectionStore:
if extras is None: if extras is None:
extras = {} extras = {}
# should always be str
if tag is None or not tag:
tag = ''
# Incase these are copied across, assume it's a reference and deepcopy() # Incase these are copied across, assume it's a reference and deepcopy()
apply_extras = deepcopy(extras) apply_extras = deepcopy(extras)
apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags')
# Was it a share link? try to fetch the data # Was it a share link? try to fetch the data
if (url.startswith("https://changedetection.io/share/")): if (url.startswith("https://changedetection.io/share/")):
@ -303,20 +299,22 @@ class ChangeDetectionStore:
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error') flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
return None return None
if tag and type(tag) == str:
# #Re 569 # Then it's probably a string of the actual tag by name, split and add it
# Could be in 'tags', var or extras, smash them together and strip for t in tag.split(','):
apply_extras['tags'] = []
if tag or extras.get('tags'):
tags = list(filter(None, list(set().union(tag.split(','), extras.get('tags', '').split(',')))))
for t in list(map(str.strip, tags)):
# for each stripped tag, add tag as UUID # for each stripped tag, add tag as UUID
apply_extras['tags'].append(self.add_tag(t)) for a_t in t.split(','):
tag_uuid = self.add_tag(a_t)
apply_extras['tags'].append(tag_uuid)
# Or if UUIDs given directly # Or if UUIDs given directly
if tag_uuids: if tag_uuids:
apply_extras['tags'] = list(set(apply_extras['tags'] + tag_uuids)) apply_extras['tags'] = list(set(apply_extras['tags'] + tag_uuids))
# Make any uuids unique
if apply_extras.get('tags'):
apply_extras['tags'] = list(set(apply_extras.get('tags')))
new_watch = Watch.model(datastore_path=self.datastore_path, url=url) new_watch = Watch.model(datastore_path=self.datastore_path, url=url)
new_uuid = new_watch.get('uuid') new_uuid = new_watch.get('uuid')
@ -568,7 +566,7 @@ class ChangeDetectionStore:
def add_tag(self, name): def add_tag(self, name):
# If name exists, return that # If name exists, return that
n = name.strip().lower() n = name.strip().lower()
print (f">>> Adding new tag - '{n}") print (f">>> Adding new tag - '{n}'")
if not n: if not n:
return False return False

@ -267,7 +267,9 @@ def test_api_watch_PUT_update(client, live_server):
#live_server_setup(live_server) #live_server_setup(live_server)
api_key = extract_api_key_from_UI(client) api_key = extract_api_key_from_UI(client)
wait_for_all_checks(client) wait_for_all_checks(client)
# Create a watch # Create a watch
set_original_response() set_original_response()
test_url = url_for('test_endpoint', _external=True, test_url = url_for('test_endpoint', _external=True,
@ -283,8 +285,10 @@ def test_api_watch_PUT_update(client, live_server):
assert res.status_code == 201 assert res.status_code == 201
wait_for_all_checks(client) wait_for_all_checks(client)
# Get a listing, it will be the first one # Get a listing, it will be the first one
res = client.get( res = client.get(
url_for("createwatch"), url_for("createwatch"),

@ -2,7 +2,7 @@
import time import time
from flask import url_for from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client
import os import os
@ -154,6 +154,10 @@ def test_tag_add_in_ui(client, live_server):
) )
assert b"Tag added" in res.data assert b"Tag added" in res.data
assert b"new-test-tag" in res.data assert b"new-test-tag" in res.data
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True) res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data assert b'Deleted' in res.data
@ -219,12 +223,10 @@ def test_group_tag_notification(client, live_server):
assert "test-tag" in notification_submission assert "test-tag" in notification_submission
assert "other-tag" in notification_submission assert "other-tag" in notification_submission
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
#@todo Test that multiple notifications fired #@todo Test that multiple notifications fired
#@todo Test that each of multiple notifications with different settings #@todo Test that each of multiple notifications with different settings
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_limit_tag_ui(client, live_server): def test_limit_tag_ui(client, live_server):
#live_server_setup(live_server) #live_server_setup(live_server)
@ -260,3 +262,61 @@ def test_limit_tag_ui(client, live_server):
assert b'test-tag' in res.data assert b'test-tag' in res.data
assert res.data.count(b'processor-text_json_diff') == 20 assert res.data.count(b'processor-text_json_diff') == 20
assert b"object at" not in res.data assert b"object at" not in res.data
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
def test_clone_tag_on_import(client, live_server):
#live_server_setup(live_server)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("import_page"),
data={"urls": test_url + " test-tag, another-tag\r\n"},
follow_redirects=True
)
assert b"1 Imported" in res.data
res = client.get(url_for("index"))
assert b'test-tag' in res.data
assert b'another-tag' in res.data
watch_uuid = extract_UUID_from_client(client)
res = client.get(url_for("form_clone", uuid=watch_uuid), follow_redirects=True)
assert b'Cloned' in res.data
# 2 times plus the top link to tag
assert res.data.count(b'test-tag') == 3
assert res.data.count(b'another-tag') == 3
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_clone_tag_on_quickwatchform_add(client, live_server):
#live_server_setup(live_server)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("form_quick_watch_add"),
data={"url": test_url, "tags": ' test-tag, another-tag '},
follow_redirects=True
)
assert b"Watch added" in res.data
res = client.get(url_for("index"))
assert b'test-tag' in res.data
assert b'another-tag' in res.data
watch_uuid = extract_UUID_from_client(client)
res = client.get(url_for("form_clone", uuid=watch_uuid), follow_redirects=True)
assert b'Cloned' in res.data
# 2 times plus the top link to tag
assert res.data.count(b'test-tag') == 3
assert res.data.count(b'another-tag') == 3
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data

@ -112,6 +112,7 @@ def test_import_distillio(client, live_server):
# did the tags work? # did the tags work?
res = client.get( url_for("index")) res = client.get( url_for("index"))
# check tags
assert b"nice stuff" in res.data assert b"nice stuff" in res.data
assert b"nerd-news" in res.data assert b"nerd-news" in res.data

@ -24,9 +24,6 @@ def test_check_notification(client, live_server):
#live_server_setup(live_server) #live_server_setup(live_server)
set_original_response() set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
# Re 360 - new install should have defaults set # Re 360 - new install should have defaults set
res = client.get(url_for("settings_page")) res = client.get(url_for("settings_page"))
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json') notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
@ -142,8 +139,7 @@ def test_check_notification(client, live_server):
# Did we see the URL that had a change, in the notification? # Did we see the URL that had a change, in the notification?
# Diff was correctly executed # Diff was correctly executed
assert test_url in notification_submission
assert ':-)' in notification_submission
assert "Diff Full: Some initial text" in notification_submission assert "Diff Full: Some initial text" in notification_submission
assert "Diff: (changed) Which is across multiple lines" in notification_submission assert "Diff: (changed) Which is across multiple lines" in notification_submission
assert "(into) which has this one new line" in notification_submission assert "(into) which has this one new line" in notification_submission
@ -156,7 +152,8 @@ def test_check_notification(client, live_server):
assert "preview/" in notification_submission assert "preview/" in notification_submission
assert ":-)" in notification_submission assert ":-)" in notification_submission
assert "New ChangeDetection.io Notification - {}".format(test_url) in notification_submission assert "New ChangeDetection.io Notification - {}".format(test_url) in notification_submission
assert test_url in notification_submission
assert ':-)' in notification_submission
# Check the attachment was added, and that it is a JPEG from the original PNG # Check the attachment was added, and that it is a JPEG from the original PNG
notification_submission_object = json.loads(notification_submission) notification_submission_object = json.loads(notification_submission)
# We keep PNG screenshots for now # We keep PNG screenshots for now

@ -65,15 +65,45 @@ class update_worker(threading.Thread):
logging.info (">> SENDING NOTIFICATION") logging.info (">> SENDING NOTIFICATION")
self.notification_q.put(n_object) self.notification_q.put(n_object)
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
def send_content_changed_notification(self, watch_uuid): def _check_cascading_vars(self, var_name, watch):
from changedetectionio.notification import ( from changedetectionio.notification import (
default_notification_format_for_watch default_notification_format_for_watch,
default_notification_body,
default_notification_title
) )
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch.get(var_name)
if v and not watch.get('notification_muted'):
return v
tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid'))
if tags:
for tag_uuid, tag in tags.items():
v = tag.get(var_name)
if v and not tag.get('notification_muted'):
return v
if self.datastore.data['settings']['application'].get(var_name):
return self.datastore.data['settings']['application'].get(var_name)
# Otherwise could be defaults
if var_name == 'notification_format':
return default_notification_format_for_watch
if var_name == 'notification_body':
return default_notification_body
if var_name == 'notification_title':
return default_notification_title
return None
def send_content_changed_notification(self, watch_uuid):
n_object = {} n_object = {}
watch = self.datastore.data['watching'].get(watch_uuid, False) watch = self.datastore.data['watching'].get(watch_uuid)
if not watch: if not watch:
return return
@ -87,57 +117,20 @@ class update_worker(threading.Thread):
) )
# Should be a better parent getter in the model object # Should be a better parent getter in the model object
# Prefer - Individual watch settings > Tag settings > Global settings (in that order)
n_object['notification_urls'] = watch.get('notification_urls')
n_object['notification_title'] = watch['notification_title'] if watch['notification_title'] else \
self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = watch['notification_body'] if watch['notification_body'] else \ # Prefer - Individual watch settings > Tag settings > Global settings (in that order)
self.datastore.data['settings']['application']['notification_body'] n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch)
n_object['notification_title'] = self._check_cascading_vars('notification_title', watch)
n_object['notification_format'] = watch['notification_format'] if watch['notification_format'] != default_notification_format_for_watch else \ n_object['notification_body'] = self._check_cascading_vars('notification_body', watch)
self.datastore.data['settings']['application']['notification_format'] n_object['notification_format'] = self._check_cascading_vars('notification_format', watch)
# (Individual watch) Only prepare to notify if the rules above matched # (Individual watch) Only prepare to notify if the rules above matched
sent = False queued = False
if 'notification_urls' in n_object and n_object['notification_urls']: if n_object and n_object.get('notification_urls'):
sent = True queued = True
self.queue_notification_for_watch(n_object, watch)
# (Group tags) try by group tag
if not sent:
# Else, Try by tag, and use system default vars for format, body etc as fallback
tags = self.datastore.get_all_tags_for_watch(uuid=watch_uuid)
for tag_uuid, tag in tags.items():
n_object = {}
n_object['notification_urls'] = tag.get('notification_urls')
n_object['notification_title'] = tag.get('notification_title') if tag.get('notification_title') else \
self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = tag.get('notification_body') if tag.get('notification_body') else \
self.datastore.data['settings']['application']['notification_body']
n_object['notification_format'] = tag.get('notification_format') if tag.get('notification_format') != default_notification_format_for_watch else \
self.datastore.data['settings']['application']['notification_format']
if 'notification_urls' in n_object and n_object.get('notification_urls') and not tag.get('notification_muted'):
sent = True
self.queue_notification_for_watch(n_object, watch)
# (Group tags) try by global
if not sent:
# leave this as is, but repeat in a loop for each tag also
n_object['notification_urls'] = self.datastore.data['settings']['application'].get('notification_urls')
n_object['notification_title'] = self.datastore.data['settings']['application'].get('notification_title')
n_object['notification_body'] = self.datastore.data['settings']['application'].get('notification_body')
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
if n_object.get('notification_urls') and n_object.get('notification_body') and n_object.get('notification_title'):
sent = True
self.queue_notification_for_watch(n_object, watch) self.queue_notification_for_watch(n_object, watch)
return sent return queued
def send_filter_failure_notification(self, watch_uuid): def send_filter_failure_notification(self, watch_uuid):

Loading…
Cancel
Save