From b76148a0f4837e7ca45346f6413c51ff7695d9f0 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Wed, 14 Dec 2022 15:08:34 +0100 Subject: [PATCH] Fetcher - CPU usage - Skip processing if the previous checksum and the just fetched one was the same (#925) --- changedetectionio/__init__.py | 44 ++++----- changedetectionio/api/api_v1.py | 7 +- .../blueprint/price_data_follower/__init__.py | 5 +- changedetectionio/content_fetcher.py | 3 + changedetectionio/fetch_site_status.py | 12 ++- changedetectionio/model/Watch.py | 89 ++++++++++--------- changedetectionio/queuedWatchMetaData.py | 10 +++ changedetectionio/update_worker.py | 12 ++- 8 files changed, 102 insertions(+), 80 deletions(-) create mode 100644 changedetectionio/queuedWatchMetaData.py diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index edc6db74..b6b82a77 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -10,6 +10,7 @@ import threading import time import timeago +from changedetectionio import queuedWatchMetaData from copy import deepcopy from distutils.util import strtobool from feedgen.feed import FeedGenerator @@ -404,7 +405,6 @@ def changedetection_app(config=None, datastore_o=None): sorted_watches.append(watch) existing_tags = datastore.get_all_tags() - form = forms.quickWatchForm(request.form) output = render_template("watch-overview.html", form=form, @@ -416,7 +416,7 @@ def changedetection_app(config=None, datastore_o=None): # Don't link to hosting when we're on the hosting environment hosted_sticky=os.getenv("SALTED_PASS", False) == False, guid=datastore.data['app_guid'], - queued_uuids=[uuid for p,uuid in update_q.queue]) + queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue]) if session.get('share-link'): @@ -596,25 +596,16 @@ def changedetection_app(config=None, datastore_o=None): using_default_check_time = False break - # Use the default if its the same as system wide + # Use the default if it's the same as system-wide. if form.fetch_backend.data == datastore.data['settings']['application']['fetch_backend']: extra_update_obj['fetch_backend'] = None + # Ignore text form_ignore_text = form.ignore_text.data datastore.data['watching'][uuid]['ignore_text'] = form_ignore_text - # Reset the previous_md5 so we process a new snapshot including stripping ignore text. - if form_ignore_text: - if len(datastore.data['watching'][uuid].history): - extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid) - - # Reset the previous_md5 so we process a new snapshot including stripping ignore text. - if form.include_filters.data != datastore.data['watching'][uuid].get('include_filters', []): - if len(datastore.data['watching'][uuid].history): - extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid) - # Be sure proxy value is None if datastore.proxy_list is not None and form.data['proxy'] == '': extra_update_obj['proxy'] = None @@ -632,7 +623,7 @@ def changedetection_app(config=None, datastore_o=None): datastore.needs_write_urgent = True # Queue the watch for immediate recheck, with a higher priority - update_q.put((1, uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': False})) # Diff page [edit] link should go back to diff page if request.args.get("next") and request.args.get("next") == 'diff': @@ -773,7 +764,7 @@ def changedetection_app(config=None, datastore_o=None): importer = import_url_list() importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore) for uuid in importer.new_uuids: - update_q.put((1, uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True})) if len(importer.remaining_data) == 0: return redirect(url_for('index')) @@ -786,7 +777,7 @@ def changedetection_app(config=None, datastore_o=None): d_importer = import_distill_io_json() d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) for uuid in d_importer.new_uuids: - update_q.put((1, uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True})) @@ -1151,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None): if not add_paused and new_uuid: # Straight into the queue. - update_q.put((1, new_uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) flash("Watch added.") if add_paused: @@ -1188,7 +1179,7 @@ def changedetection_app(config=None, datastore_o=None): uuid = list(datastore.data['watching'].keys()).pop() new_uuid = datastore.clone(uuid) - update_q.put((5, new_uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid, 'skip_when_checksum_same': True})) flash('Cloned.') return redirect(url_for('index')) @@ -1196,7 +1187,7 @@ def changedetection_app(config=None, datastore_o=None): @app.route("/api/checknow", methods=['GET']) @login_required def form_watch_checknow(): - + # Forced recheck will skip the 'skip if content is the same' rule (, 'reprocess_existing_data': True}))) tag = request.args.get('tag') uuid = request.args.get('uuid') i = 0 @@ -1205,11 +1196,9 @@ def changedetection_app(config=None, datastore_o=None): for t in running_update_threads: running_uuids.append(t.current_uuid) - # @todo check thread is running and skip - if uuid: if uuid not in running_uuids: - update_q.put((1, uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': False})) i = 1 elif tag != None: @@ -1217,14 +1206,14 @@ def changedetection_app(config=None, datastore_o=None): for watch_uuid, watch in datastore.data['watching'].items(): if (tag != None and tag in watch['tag']): if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: - update_q.put((1, watch_uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid, 'skip_when_checksum_same': False})) i += 1 else: # No tag, no uuid, add everything. for watch_uuid, watch in datastore.data['watching'].items(): if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: - update_q.put((1, watch_uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid, 'skip_when_checksum_same': False})) i += 1 flash("{} watches are queued for rechecking.".format(i)) return redirect(url_for('index', tag=tag)) @@ -1344,7 +1333,7 @@ def changedetection_app(config=None, datastore_o=None): app.register_blueprint(browser_steps.construct_blueprint(datastore), url_prefix='/browser-steps') import changedetectionio.blueprint.price_data_follower as price_data_follower - app.register_blueprint(price_data_follower.construct_blueprint(datastore), url_prefix='/price_data_follower') + app.register_blueprint(price_data_follower.construct_blueprint(datastore, update_q), url_prefix='/price_data_follower') # @todo handle ctrl break @@ -1492,7 +1481,7 @@ def ticker_thread_check_time_launch_checks(): seconds_since_last_recheck = now - watch['last_checked'] if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds: - if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]: + if not uuid in running_uuids and uuid not in [q_uuid.item['uuid'] for q_uuid in update_q.queue]: # Proxies can be set to have a limit on seconds between which they can be called watch_proxy = datastore.get_preferred_proxy_for_watch(uuid=uuid) @@ -1523,8 +1512,9 @@ def ticker_thread_check_time_launch_checks(): priority, watch.jitter_seconds, now - watch['last_checked'])) + # Into the queue with you - update_q.put((priority, uuid)) + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid, 'skip_when_checksum_same': True})) # Reset for next time watch.jitter_seconds = 0 diff --git a/changedetectionio/api/api_v1.py b/changedetectionio/api/api_v1.py index 40131ca5..8ebdfa22 100644 --- a/changedetectionio/api/api_v1.py +++ b/changedetectionio/api/api_v1.py @@ -1,3 +1,4 @@ +from changedetectionio import queuedWatchMetaData from flask_restful import abort, Resource from flask import request, make_response import validators @@ -24,7 +25,7 @@ class Watch(Resource): abort(404, message='No watch exists with the UUID of {}'.format(uuid)) if request.args.get('recheck'): - self.update_q.put((1, uuid)) + self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True})) return "OK", 200 # Return without history, get that via another API call @@ -100,7 +101,7 @@ class CreateWatch(Resource): extras = {'title': json_data['title'].strip()} if json_data.get('title') else {} new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras) - self.update_q.put((1, new_uuid)) + self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True})) return {'uuid': new_uuid}, 201 # Return concise list of available watches and some very basic info @@ -118,7 +119,7 @@ class CreateWatch(Resource): if request.args.get('recheck_all'): for uuid in self.datastore.data['watching'].keys(): - self.update_q.put((1, uuid)) + self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True})) return {'status': "OK"}, 200 return list, 200 diff --git a/changedetectionio/blueprint/price_data_follower/__init__.py b/changedetectionio/blueprint/price_data_follower/__init__.py index 835e8a77..42aaee99 100644 --- a/changedetectionio/blueprint/price_data_follower/__init__.py +++ b/changedetectionio/blueprint/price_data_follower/__init__.py @@ -3,11 +3,13 @@ from distutils.util import strtobool from flask import Blueprint, flash, redirect, url_for from flask_login import login_required from changedetectionio.store import ChangeDetectionStore +from changedetectionio import queuedWatchMetaData +from queue import PriorityQueue PRICE_DATA_TRACK_ACCEPT = 'accepted' PRICE_DATA_TRACK_REJECT = 'rejected' -def construct_blueprint(datastore: ChangeDetectionStore): +def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue): price_data_follower_blueprint = Blueprint('price_data_follower', __name__) @@ -15,6 +17,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): @price_data_follower_blueprint.route("//accept", methods=['GET']) def accept(uuid): datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT + update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': False})) return redirect(url_for("form_watch_checknow", uuid=uuid)) diff --git a/changedetectionio/content_fetcher.py b/changedetectionio/content_fetcher.py index 8656047d..a0119e91 100644 --- a/changedetectionio/content_fetcher.py +++ b/changedetectionio/content_fetcher.py @@ -23,6 +23,9 @@ class Non200ErrorCodeReceived(Exception): self.page_text = html_tools.html_to_text(page_html) return +class checksumFromPreviousCheckWasTheSame(Exception): + def __init__(self): + return class JSActionExceptions(Exception): def __init__(self, status_code, url, screenshot, message=''): diff --git a/changedetectionio/fetch_site_status.py b/changedetectionio/fetch_site_status.py index 2a08b42f..7dfd38b6 100644 --- a/changedetectionio/fetch_site_status.py +++ b/changedetectionio/fetch_site_status.py @@ -6,6 +6,7 @@ import urllib3 from changedetectionio import content_fetcher, html_tools from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT +from copy import deepcopy urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -38,8 +39,7 @@ class perform_site_check(): return regex - def run(self, uuid): - from copy import deepcopy + def run(self, uuid, skip_when_checksum_same=True): changed_detected = False screenshot = False # as bytes stripped_text_from_html = "" @@ -122,6 +122,14 @@ class perform_site_check(): self.screenshot = fetcher.screenshot self.xpath_data = fetcher.xpath_data + # Watches added automatically in the queue manager will skip if its the same checksum as the previous run + # Saves a lot of CPU + update_obj['previous_md5_before_filters'] = hashlib.md5(fetcher.content.encode('utf-8')).hexdigest() + if skip_when_checksum_same: + if update_obj['previous_md5_before_filters'] == watch.get('previous_md5_before_filters'): + raise content_fetcher.checksumFromPreviousCheckWasTheSame() + + # Fetching complete, now filters # @todo move to class / maybe inside of fetcher abstract base? diff --git a/changedetectionio/model/Watch.py b/changedetectionio/model/Watch.py index d1183752..40f5de0b 100644 --- a/changedetectionio/model/Watch.py +++ b/changedetectionio/model/Watch.py @@ -14,51 +14,52 @@ from changedetectionio.notification import ( class model(dict): __newest_history_key = None - __history_n=0 + __history_n = 0 __base_config = { - #'history': {}, # Dict of timestamp and output stripped filename (removed) - #'newest_history_key': 0, (removed, taken from history.txt index) - 'body': None, - 'check_unique_lines': False, # On change-detected, compare against all history if its something new - 'check_count': 0, - 'consecutive_filter_failures': 0, # Every time the CSS/xPath filter cannot be located, reset when all is fine. - 'extract_text': [], # Extract text by regex after filters - 'extract_title_as_title': False, - 'fetch_backend': None, - 'filter_failure_notification_send': strtobool(os.getenv('FILTER_FAILURE_NOTIFICATION_SEND_DEFAULT', 'True')), - 'has_ldjson_price_data': None, - 'track_ldjson_price_data': None, - 'headers': {}, # Extra headers to send - 'ignore_text': [], # List of text to ignore when calculating the comparison checksum - 'include_filters': [], - 'last_checked': 0, - 'last_error': False, - 'last_viewed': 0, # history key value of the last viewed via the [diff] link - 'method': 'GET', - # Custom notification content - 'notification_body': None, - 'notification_format': default_notification_format_for_watch, - 'notification_muted': False, - 'notification_title': None, - 'notification_screenshot': False, # Include the latest screenshot if available and supported by the apprise URL - 'notification_urls': [], # List of URLs to add to the notification Queue (Usually AppRise) - 'paused': False, - 'previous_md5': False, - 'proxy': None, # Preferred proxy connection - 'subtractive_selectors': [], - 'tag': None, - 'text_should_not_be_present': [], # Text that should not present - # Re #110, so then if this is set to None, we know to use the default value instead - # Requires setting to None on submit if it's the same as the default - # Should be all None by default, so we use the system default in this case. - 'time_between_check': {'weeks': None, 'days': None, 'hours': None, 'minutes': None, 'seconds': None}, - 'title': None, - 'trigger_text': [], # List of text or regex to wait for until a change is detected - 'url': None, - 'uuid': str(uuid.uuid4()), - 'webdriver_delay': None, - 'webdriver_js_execute_code': None, # Run before change-detection - } + # 'history': {}, # Dict of timestamp and output stripped filename (removed) + # 'newest_history_key': 0, (removed, taken from history.txt index) + 'body': None, + 'check_unique_lines': False, # On change-detected, compare against all history if its something new + 'check_count': 0, + 'consecutive_filter_failures': 0, # Every time the CSS/xPath filter cannot be located, reset when all is fine. + 'extract_text': [], # Extract text by regex after filters + 'extract_title_as_title': False, + 'fetch_backend': None, + 'filter_failure_notification_send': strtobool(os.getenv('FILTER_FAILURE_NOTIFICATION_SEND_DEFAULT', 'True')), + 'has_ldjson_price_data': None, + 'track_ldjson_price_data': None, + 'headers': {}, # Extra headers to send + 'ignore_text': [], # List of text to ignore when calculating the comparison checksum + 'include_filters': [], + 'last_checked': 0, + 'last_error': False, + 'last_viewed': 0, # history key value of the last viewed via the [diff] link + 'method': 'GET', + # Custom notification content + 'notification_body': None, + 'notification_format': default_notification_format_for_watch, + 'notification_muted': False, + 'notification_title': None, + 'notification_screenshot': False, # Include the latest screenshot if available and supported by the apprise URL + 'notification_urls': [], # List of URLs to add to the notification Queue (Usually AppRise) + 'paused': False, + 'previous_md5': False, + 'previous_md5_before_filters': False, # Used for skipping changedetection entirely + 'proxy': None, # Preferred proxy connection + 'subtractive_selectors': [], + 'tag': None, + 'text_should_not_be_present': [], # Text that should not present + # Re #110, so then if this is set to None, we know to use the default value instead + # Requires setting to None on submit if it's the same as the default + # Should be all None by default, so we use the system default in this case. + 'time_between_check': {'weeks': None, 'days': None, 'hours': None, 'minutes': None, 'seconds': None}, + 'title': None, + 'trigger_text': [], # List of text or regex to wait for until a change is detected + 'url': None, + 'uuid': str(uuid.uuid4()), + 'webdriver_delay': None, + 'webdriver_js_execute_code': None, # Run before change-detection + } jitter_seconds = 0 def __init__(self, *arg, **kw): diff --git a/changedetectionio/queuedWatchMetaData.py b/changedetectionio/queuedWatchMetaData.py new file mode 100644 index 00000000..be388a78 --- /dev/null +++ b/changedetectionio/queuedWatchMetaData.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass, field +from typing import Any + +# So that we can queue some metadata in `item` +# https://docs.python.org/3/library/queue.html#queue.PriorityQueue +# +@dataclass(order=True) +class PrioritizedItem: + priority: int + item: Any=field(compare=False) diff --git a/changedetectionio/update_worker.py b/changedetectionio/update_worker.py index 605c0ba9..0e902a87 100644 --- a/changedetectionio/update_worker.py +++ b/changedetectionio/update_worker.py @@ -4,6 +4,7 @@ import queue import time from changedetectionio import content_fetcher +from changedetectionio import queuedWatchMetaData from changedetectionio.fetch_site_status import FilterNotFoundInResponse # A single update worker @@ -157,11 +158,12 @@ class update_worker(threading.Thread): while not self.app.config.exit.is_set(): try: - priority, uuid = self.q.get(block=False) + queued_item_data = self.q.get(block=False) except queue.Empty: pass else: + uuid = queued_item_data.item.get('uuid') self.current_uuid = uuid if uuid in list(self.datastore.data['watching'].keys()): @@ -171,11 +173,11 @@ class update_worker(threading.Thread): update_obj= {} xpath_data = False process_changedetection_results = True - print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url'])) + print("> Processing UUID {} Priority {} URL {}".format(uuid, queued_item_data.priority, self.datastore.data['watching'][uuid]['url'])) now = time.time() try: - changed_detected, update_obj, contents = update_handler.run(uuid) + changed_detected, update_obj, contents = update_handler.run(uuid, skip_when_checksum_same=queued_item_data.item.get('skip_when_checksum_same')) # Re #342 # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. # We then convert/.decode('utf-8') for the notification etc @@ -241,6 +243,10 @@ class update_worker(threading.Thread): process_changedetection_results = True + except content_fetcher.checksumFromPreviousCheckWasTheSame as e: + # Yes fine, so nothing todo + pass + except content_fetcher.BrowserStepsStepTimout as e: if not self.datastore.data['watching'].get(uuid):