Fetcher - CPU usage - Skip processing if the previous checksum and the just fetched one was the same (#925)

pull/1232/head
dgtlmoon 2 years ago committed by GitHub
parent 93cc30437f
commit b76148a0f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,6 +10,7 @@ import threading
import time import time
import timeago import timeago
from changedetectionio import queuedWatchMetaData
from copy import deepcopy from copy import deepcopy
from distutils.util import strtobool from distutils.util import strtobool
from feedgen.feed import FeedGenerator from feedgen.feed import FeedGenerator
@ -404,7 +405,6 @@ def changedetection_app(config=None, datastore_o=None):
sorted_watches.append(watch) sorted_watches.append(watch)
existing_tags = datastore.get_all_tags() existing_tags = datastore.get_all_tags()
form = forms.quickWatchForm(request.form) form = forms.quickWatchForm(request.form)
output = render_template("watch-overview.html", output = render_template("watch-overview.html",
form=form, form=form,
@ -416,7 +416,7 @@ def changedetection_app(config=None, datastore_o=None):
# Don't link to hosting when we're on the hosting environment # Don't link to hosting when we're on the hosting environment
hosted_sticky=os.getenv("SALTED_PASS", False) == False, hosted_sticky=os.getenv("SALTED_PASS", False) == False,
guid=datastore.data['app_guid'], guid=datastore.data['app_guid'],
queued_uuids=[uuid for p,uuid in update_q.queue]) queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue])
if session.get('share-link'): if session.get('share-link'):
@ -596,25 +596,16 @@ def changedetection_app(config=None, datastore_o=None):
using_default_check_time = False using_default_check_time = False
break break
# Use the default if its the same as system wide # Use the default if it's the same as system-wide.
if form.fetch_backend.data == datastore.data['settings']['application']['fetch_backend']: if form.fetch_backend.data == datastore.data['settings']['application']['fetch_backend']:
extra_update_obj['fetch_backend'] = None extra_update_obj['fetch_backend'] = None
# Ignore text # Ignore text
form_ignore_text = form.ignore_text.data form_ignore_text = form.ignore_text.data
datastore.data['watching'][uuid]['ignore_text'] = form_ignore_text datastore.data['watching'][uuid]['ignore_text'] = form_ignore_text
# Reset the previous_md5 so we process a new snapshot including stripping ignore text.
if form_ignore_text:
if len(datastore.data['watching'][uuid].history):
extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid)
# Reset the previous_md5 so we process a new snapshot including stripping ignore text.
if form.include_filters.data != datastore.data['watching'][uuid].get('include_filters', []):
if len(datastore.data['watching'][uuid].history):
extra_update_obj['previous_md5'] = get_current_checksum_include_ignore_text(uuid=uuid)
# Be sure proxy value is None # Be sure proxy value is None
if datastore.proxy_list is not None and form.data['proxy'] == '': if datastore.proxy_list is not None and form.data['proxy'] == '':
extra_update_obj['proxy'] = None extra_update_obj['proxy'] = None
@ -632,7 +623,7 @@ def changedetection_app(config=None, datastore_o=None):
datastore.needs_write_urgent = True datastore.needs_write_urgent = True
# Queue the watch for immediate recheck, with a higher priority # Queue the watch for immediate recheck, with a higher priority
update_q.put((1, uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': False}))
# Diff page [edit] link should go back to diff page # Diff page [edit] link should go back to diff page
if request.args.get("next") and request.args.get("next") == 'diff': if request.args.get("next") and request.args.get("next") == 'diff':
@ -773,7 +764,7 @@ def changedetection_app(config=None, datastore_o=None):
importer = import_url_list() importer = import_url_list()
importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore) importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore)
for uuid in importer.new_uuids: for uuid in importer.new_uuids:
update_q.put((1, uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True}))
if len(importer.remaining_data) == 0: if len(importer.remaining_data) == 0:
return redirect(url_for('index')) return redirect(url_for('index'))
@ -786,7 +777,7 @@ def changedetection_app(config=None, datastore_o=None):
d_importer = import_distill_io_json() d_importer = import_distill_io_json()
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
for uuid in d_importer.new_uuids: for uuid in d_importer.new_uuids:
update_q.put((1, uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True}))
@ -1151,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None):
if not add_paused and new_uuid: if not add_paused and new_uuid:
# Straight into the queue. # Straight into the queue.
update_q.put((1, new_uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
flash("Watch added.") flash("Watch added.")
if add_paused: if add_paused:
@ -1188,7 +1179,7 @@ def changedetection_app(config=None, datastore_o=None):
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
new_uuid = datastore.clone(uuid) new_uuid = datastore.clone(uuid)
update_q.put((5, new_uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
flash('Cloned.') flash('Cloned.')
return redirect(url_for('index')) return redirect(url_for('index'))
@ -1196,7 +1187,7 @@ def changedetection_app(config=None, datastore_o=None):
@app.route("/api/checknow", methods=['GET']) @app.route("/api/checknow", methods=['GET'])
@login_required @login_required
def form_watch_checknow(): def form_watch_checknow():
# Forced recheck will skip the 'skip if content is the same' rule (, 'reprocess_existing_data': True})))
tag = request.args.get('tag') tag = request.args.get('tag')
uuid = request.args.get('uuid') uuid = request.args.get('uuid')
i = 0 i = 0
@ -1205,11 +1196,9 @@ def changedetection_app(config=None, datastore_o=None):
for t in running_update_threads: for t in running_update_threads:
running_uuids.append(t.current_uuid) running_uuids.append(t.current_uuid)
# @todo check thread is running and skip
if uuid: if uuid:
if uuid not in running_uuids: if uuid not in running_uuids:
update_q.put((1, uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': False}))
i = 1 i = 1
elif tag != None: elif tag != None:
@ -1217,14 +1206,14 @@ def changedetection_app(config=None, datastore_o=None):
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if (tag != None and tag in watch['tag']): if (tag != None and tag in watch['tag']):
if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']:
update_q.put((1, watch_uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid, 'skip_when_checksum_same': False}))
i += 1 i += 1
else: else:
# No tag, no uuid, add everything. # No tag, no uuid, add everything.
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']:
update_q.put((1, watch_uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid, 'skip_when_checksum_same': False}))
i += 1 i += 1
flash("{} watches are queued for rechecking.".format(i)) flash("{} watches are queued for rechecking.".format(i))
return redirect(url_for('index', tag=tag)) return redirect(url_for('index', tag=tag))
@ -1344,7 +1333,7 @@ def changedetection_app(config=None, datastore_o=None):
app.register_blueprint(browser_steps.construct_blueprint(datastore), url_prefix='/browser-steps') app.register_blueprint(browser_steps.construct_blueprint(datastore), url_prefix='/browser-steps')
import changedetectionio.blueprint.price_data_follower as price_data_follower import changedetectionio.blueprint.price_data_follower as price_data_follower
app.register_blueprint(price_data_follower.construct_blueprint(datastore), url_prefix='/price_data_follower') app.register_blueprint(price_data_follower.construct_blueprint(datastore, update_q), url_prefix='/price_data_follower')
# @todo handle ctrl break # @todo handle ctrl break
@ -1492,7 +1481,7 @@ def ticker_thread_check_time_launch_checks():
seconds_since_last_recheck = now - watch['last_checked'] seconds_since_last_recheck = now - watch['last_checked']
if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds: if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds:
if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]: if not uuid in running_uuids and uuid not in [q_uuid.item['uuid'] for q_uuid in update_q.queue]:
# Proxies can be set to have a limit on seconds between which they can be called # Proxies can be set to have a limit on seconds between which they can be called
watch_proxy = datastore.get_preferred_proxy_for_watch(uuid=uuid) watch_proxy = datastore.get_preferred_proxy_for_watch(uuid=uuid)
@ -1523,8 +1512,9 @@ def ticker_thread_check_time_launch_checks():
priority, priority,
watch.jitter_seconds, watch.jitter_seconds,
now - watch['last_checked'])) now - watch['last_checked']))
# Into the queue with you # Into the queue with you
update_q.put((priority, uuid)) update_q.put(queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid, 'skip_when_checksum_same': True}))
# Reset for next time # Reset for next time
watch.jitter_seconds = 0 watch.jitter_seconds = 0

@ -1,3 +1,4 @@
from changedetectionio import queuedWatchMetaData
from flask_restful import abort, Resource from flask_restful import abort, Resource
from flask import request, make_response from flask import request, make_response
import validators import validators
@ -24,7 +25,7 @@ class Watch(Resource):
abort(404, message='No watch exists with the UUID of {}'.format(uuid)) abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if request.args.get('recheck'): if request.args.get('recheck'):
self.update_q.put((1, uuid)) self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True}))
return "OK", 200 return "OK", 200
# Return without history, get that via another API call # Return without history, get that via another API call
@ -100,7 +101,7 @@ class CreateWatch(Resource):
extras = {'title': json_data['title'].strip()} if json_data.get('title') else {} extras = {'title': json_data['title'].strip()} if json_data.get('title') else {}
new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras) new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras)
self.update_q.put((1, new_uuid)) self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid, 'skip_when_checksum_same': True}))
return {'uuid': new_uuid}, 201 return {'uuid': new_uuid}, 201
# Return concise list of available watches and some very basic info # Return concise list of available watches and some very basic info
@ -118,7 +119,7 @@ class CreateWatch(Resource):
if request.args.get('recheck_all'): if request.args.get('recheck_all'):
for uuid in self.datastore.data['watching'].keys(): for uuid in self.datastore.data['watching'].keys():
self.update_q.put((1, uuid)) self.update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': True}))
return {'status': "OK"}, 200 return {'status': "OK"}, 200
return list, 200 return list, 200

@ -3,11 +3,13 @@ from distutils.util import strtobool
from flask import Blueprint, flash, redirect, url_for from flask import Blueprint, flash, redirect, url_for
from flask_login import login_required from flask_login import login_required
from changedetectionio.store import ChangeDetectionStore from changedetectionio.store import ChangeDetectionStore
from changedetectionio import queuedWatchMetaData
from queue import PriorityQueue
PRICE_DATA_TRACK_ACCEPT = 'accepted' PRICE_DATA_TRACK_ACCEPT = 'accepted'
PRICE_DATA_TRACK_REJECT = 'rejected' PRICE_DATA_TRACK_REJECT = 'rejected'
def construct_blueprint(datastore: ChangeDetectionStore): def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue):
price_data_follower_blueprint = Blueprint('price_data_follower', __name__) price_data_follower_blueprint = Blueprint('price_data_follower', __name__)
@ -15,6 +17,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
@price_data_follower_blueprint.route("/<string:uuid>/accept", methods=['GET']) @price_data_follower_blueprint.route("/<string:uuid>/accept", methods=['GET'])
def accept(uuid): def accept(uuid):
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT
update_q.put(queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid, 'skip_when_checksum_same': False}))
return redirect(url_for("form_watch_checknow", uuid=uuid)) return redirect(url_for("form_watch_checknow", uuid=uuid))

@ -23,6 +23,9 @@ class Non200ErrorCodeReceived(Exception):
self.page_text = html_tools.html_to_text(page_html) self.page_text = html_tools.html_to_text(page_html)
return return
class checksumFromPreviousCheckWasTheSame(Exception):
def __init__(self):
return
class JSActionExceptions(Exception): class JSActionExceptions(Exception):
def __init__(self, status_code, url, screenshot, message=''): def __init__(self, status_code, url, screenshot, message=''):

@ -6,6 +6,7 @@ import urllib3
from changedetectionio import content_fetcher, html_tools from changedetectionio import content_fetcher, html_tools
from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT
from copy import deepcopy
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@ -38,8 +39,7 @@ class perform_site_check():
return regex return regex
def run(self, uuid): def run(self, uuid, skip_when_checksum_same=True):
from copy import deepcopy
changed_detected = False changed_detected = False
screenshot = False # as bytes screenshot = False # as bytes
stripped_text_from_html = "" stripped_text_from_html = ""
@ -122,6 +122,14 @@ class perform_site_check():
self.screenshot = fetcher.screenshot self.screenshot = fetcher.screenshot
self.xpath_data = fetcher.xpath_data self.xpath_data = fetcher.xpath_data
# Watches added automatically in the queue manager will skip if its the same checksum as the previous run
# Saves a lot of CPU
update_obj['previous_md5_before_filters'] = hashlib.md5(fetcher.content.encode('utf-8')).hexdigest()
if skip_when_checksum_same:
if update_obj['previous_md5_before_filters'] == watch.get('previous_md5_before_filters'):
raise content_fetcher.checksumFromPreviousCheckWasTheSame()
# Fetching complete, now filters # Fetching complete, now filters
# @todo move to class / maybe inside of fetcher abstract base? # @todo move to class / maybe inside of fetcher abstract base?

@ -14,10 +14,10 @@ from changedetectionio.notification import (
class model(dict): class model(dict):
__newest_history_key = None __newest_history_key = None
__history_n=0 __history_n = 0
__base_config = { __base_config = {
#'history': {}, # Dict of timestamp and output stripped filename (removed) # 'history': {}, # Dict of timestamp and output stripped filename (removed)
#'newest_history_key': 0, (removed, taken from history.txt index) # 'newest_history_key': 0, (removed, taken from history.txt index)
'body': None, 'body': None,
'check_unique_lines': False, # On change-detected, compare against all history if its something new 'check_unique_lines': False, # On change-detected, compare against all history if its something new
'check_count': 0, 'check_count': 0,
@ -44,6 +44,7 @@ class model(dict):
'notification_urls': [], # List of URLs to add to the notification Queue (Usually AppRise) 'notification_urls': [], # List of URLs to add to the notification Queue (Usually AppRise)
'paused': False, 'paused': False,
'previous_md5': False, 'previous_md5': False,
'previous_md5_before_filters': False, # Used for skipping changedetection entirely
'proxy': None, # Preferred proxy connection 'proxy': None, # Preferred proxy connection
'subtractive_selectors': [], 'subtractive_selectors': [],
'tag': None, 'tag': None,

@ -0,0 +1,10 @@
from dataclasses import dataclass, field
from typing import Any
# So that we can queue some metadata in `item`
# https://docs.python.org/3/library/queue.html#queue.PriorityQueue
#
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any=field(compare=False)

@ -4,6 +4,7 @@ import queue
import time import time
from changedetectionio import content_fetcher from changedetectionio import content_fetcher
from changedetectionio import queuedWatchMetaData
from changedetectionio.fetch_site_status import FilterNotFoundInResponse from changedetectionio.fetch_site_status import FilterNotFoundInResponse
# A single update worker # A single update worker
@ -157,11 +158,12 @@ class update_worker(threading.Thread):
while not self.app.config.exit.is_set(): while not self.app.config.exit.is_set():
try: try:
priority, uuid = self.q.get(block=False) queued_item_data = self.q.get(block=False)
except queue.Empty: except queue.Empty:
pass pass
else: else:
uuid = queued_item_data.item.get('uuid')
self.current_uuid = uuid self.current_uuid = uuid
if uuid in list(self.datastore.data['watching'].keys()): if uuid in list(self.datastore.data['watching'].keys()):
@ -171,11 +173,11 @@ class update_worker(threading.Thread):
update_obj= {} update_obj= {}
xpath_data = False xpath_data = False
process_changedetection_results = True process_changedetection_results = True
print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url'])) print("> Processing UUID {} Priority {} URL {}".format(uuid, queued_item_data.priority, self.datastore.data['watching'][uuid]['url']))
now = time.time() now = time.time()
try: try:
changed_detected, update_obj, contents = update_handler.run(uuid) changed_detected, update_obj, contents = update_handler.run(uuid, skip_when_checksum_same=queued_item_data.item.get('skip_when_checksum_same'))
# Re #342 # Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc # We then convert/.decode('utf-8') for the notification etc
@ -241,6 +243,10 @@ class update_worker(threading.Thread):
process_changedetection_results = True process_changedetection_results = True
except content_fetcher.checksumFromPreviousCheckWasTheSame as e:
# Yes fine, so nothing todo
pass
except content_fetcher.BrowserStepsStepTimout as e: except content_fetcher.BrowserStepsStepTimout as e:
if not self.datastore.data['watching'].get(uuid): if not self.datastore.data['watching'].get(uuid):

Loading…
Cancel
Save