From c3c0f62662fc4e4761165ae7edb8afa59cc09e6f Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Mon, 29 Aug 2022 17:19:10 +0200 Subject: [PATCH] WIP --- changedetectionio/__init__.py | 100 ++++----- changedetectionio/api/api_v1.py | 12 +- changedetectionio/update_worker.py | 342 ++++++++++++++--------------- requirements.txt | 2 + 4 files changed, 217 insertions(+), 239 deletions(-) diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index 509bc6f9..3f918fc0 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -18,6 +18,7 @@ import threading import time from copy import deepcopy from threading import Event +from PriorityThreadPoolExecutor import PriorityThreadPoolExecutor import flask_login import logging @@ -49,12 +50,12 @@ __version__ = '0.39.18' datastore = None # Local -running_update_threads = [] +running_update_uuids = set() ticker_thread = None extra_stylesheets = [] -update_q = queue.PriorityQueue() +pool = None notification_q = queue.Queue() @@ -105,10 +106,9 @@ def init_app_secret(datastore_path): # running or something similar. @app.template_filter('format_last_checked_time') def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): - # Worker thread tells us which UUID it is currently processing. - for t in running_update_threads: - if t.current_uuid == watch_obj['uuid']: - return ' Checking now' + + if watch_obj['uuid'] in running_update_uuids: + return ' Checking now' if watch_obj['last_checked'] == 0: return 'Not yet' @@ -178,13 +178,15 @@ class User(flask_login.UserMixin): def changedetection_app(config=None, datastore_o=None): global datastore + global pool datastore = datastore_o # so far just for read-only via tests, but this will be moved eventually to be the main source # (instead of the global var) app.config['DATASTORE']=datastore_o - #app.config.update(config or {}) + pool = PriorityThreadPoolExecutor(max_workers=int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))) + login_manager = flask_login.LoginManager(app) login_manager.login_view = 'login' @@ -193,20 +195,17 @@ def changedetection_app(config=None, datastore_o=None): watch_api.add_resource(api_v1.WatchSingleHistory, '/api/v1/watch//history/', - resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) + resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch}) watch_api.add_resource(api_v1.WatchHistory, '/api/v1/watch//history', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch', - resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) + resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch}) watch_api.add_resource(api_v1.Watch, '/api/v1/watch/', - resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) - - - + resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch}) # Setup cors headers to allow all domains @@ -417,8 +416,7 @@ def changedetection_app(config=None, datastore_o=None): # Don't link to hosting when we're on the hosting environment hosted_sticky=os.getenv("SALTED_PASS", False) == False, guid=datastore.data['app_guid'], - queued_uuids=[uuid for p,uuid in update_q.queue]) - + queued_uuids=get_uuids_in_queue()) if session.get('share-link'): del(session['share-link']) @@ -632,7 +630,7 @@ def changedetection_app(config=None, datastore_o=None): datastore.needs_write_urgent = True # Queue the watch for immediate recheck, with a higher priority - update_q.put((1, uuid)) + queue_single_watch(uuid=uuid, priority=1) # Diff page [edit] link should go back to diff page if request.args.get("next") and request.args.get("next") == 'diff': @@ -749,7 +747,7 @@ def changedetection_app(config=None, datastore_o=None): importer = import_url_list() importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore) for uuid in importer.new_uuids: - update_q.put((1, uuid)) + queue_single_watch(uuid=uuid, priority=1) if len(importer.remaining_data) == 0: return redirect(url_for('index')) @@ -762,7 +760,7 @@ def changedetection_app(config=None, datastore_o=None): d_importer = import_distill_io_json() d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) for uuid in d_importer.new_uuids: - update_q.put((1, uuid)) + queue_single_watch(uuid=uuid, priority=1) @@ -1107,7 +1105,7 @@ def changedetection_app(config=None, datastore_o=None): if not add_paused and new_uuid: # Straight into the queue. - update_q.put((1, new_uuid)) + queue_single_watch(uuid=uuid, priority=1) flash("Watch added.") if add_paused: @@ -1144,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None): uuid = list(datastore.data['watching'].keys()).pop() new_uuid = datastore.clone(uuid) - update_q.put((5, new_uuid)) + queue_single_watch(uuid=uuid, priority=5) flash('Cloned.') return redirect(url_for('index')) @@ -1157,31 +1155,25 @@ def changedetection_app(config=None, datastore_o=None): uuid = request.args.get('uuid') i = 0 - running_uuids = [] - for t in running_update_threads: - running_uuids.append(t.current_uuid) - - # @todo check thread is running and skip - if uuid: - if uuid not in running_uuids: - update_q.put((1, uuid)) + if uuid not in get_uuids_in_queue(): + queue_single_watch(uuid=uuid, priority=1) i = 1 elif tag != None: # Items that have this current tag for watch_uuid, watch in datastore.data['watching'].items(): if (tag != None and tag in watch['tag']): - if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: - update_q.put((1, watch_uuid)) + if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']: + queue_single_watch(uuid=watch_uuid, priority=1) i += 1 else: # No tag, no uuid, add everything. for watch_uuid, watch in datastore.data['watching'].items(): - if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: - update_q.put((1, watch_uuid)) + if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']: + queue_single_watch(uuid=watch_uuid, priority=1) i += 1 flash("{} watches are queued for rechecking.".format(i)) return redirect(url_for('index', tag=tag)) @@ -1346,33 +1338,31 @@ def notification_runner(): # Trim the log length notification_debug_log = notification_debug_log[-100:] -# Thread runner to check every minute, look for new watches to feed into the Queue. +def queue_single_watch(uuid, priority=1): + pool.submit(process_single_watch, uuid, priority=int(time.time()) - priority) + +def process_single_watch(uuid): + running_update_uuids.add(uuid) + from changedetectionio import update_worker + worker = update_worker.update_worker(notification_q=notification_q, datastore=datastore) + worker.run(uuid) + running_update_uuids.remove(uuid) + +def get_uuids_in_queue(): + return [workitem.args[0] for p, workitem in pool._work_queue.queue] + +# Thread runner to load watch jobs into the queue as they become ready/due for checking again def ticker_thread_check_time_launch_checks(): import random - from changedetectionio import update_worker recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) - # Spin up Workers that do the fetching - # Can be overriden by ENV or use the default settings - n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) - for _ in range(n_workers): - new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) - running_update_threads.append(new_worker) - new_worker.start() - while not app.config.exit.is_set(): - # Get a list of watches by UUID that are currently fetching data - running_uuids = [] - for t in running_update_threads: - if t.current_uuid: - running_uuids.append(t.current_uuid) - # Re #232 - Deepcopy the data incase it changes while we're iterating through it all watch_uuid_list = [] - while True: + while not app.config.exit.is_set(): try: watch_uuid_list = datastore.data['watching'].keys() except RuntimeError as e: @@ -1382,8 +1372,9 @@ def ticker_thread_check_time_launch_checks(): break # Re #438 - Don't place more watches in the queue to be checked if the queue is already large - while update_q.qsize() >= 2000: - time.sleep(1) + while pool._work_queue.qsize() >= 2000: + if not app.config.exit.is_set(): + time.sleep(1) recheck_time_system_seconds = int(datastore.threshold_seconds) @@ -1414,7 +1405,8 @@ def ticker_thread_check_time_launch_checks(): seconds_since_last_recheck = now - watch['last_checked'] if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds: - if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]: + #@todo check 'not in running_uuids' + if not uuid and uuid not in get_uuids_in_queue(): # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it. priority = int(time.time()) print( @@ -1425,8 +1417,8 @@ def ticker_thread_check_time_launch_checks(): priority, watch.jitter_seconds, now - watch['last_checked'])) - # Into the queue with you - update_q.put((priority, uuid)) + + queue_single_watch(uuid=uuid, priority=priority) # Reset for next time watch.jitter_seconds = 0 diff --git a/changedetectionio/api/api_v1.py b/changedetectionio/api/api_v1.py index a432bc67..327e5ea4 100644 --- a/changedetectionio/api/api_v1.py +++ b/changedetectionio/api/api_v1.py @@ -1,6 +1,8 @@ from flask_restful import abort, Resource from flask import request, make_response + import validators + from . import auth @@ -11,7 +13,7 @@ class Watch(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] - self.update_q = kwargs['update_q'] + self.queue_single_watch = kwargs['queue_single_watch'] # Get information about a single watch, excluding the history list (can be large) # curl http://localhost:4000/api/v1/watch/ @@ -24,7 +26,7 @@ class Watch(Resource): abort(404, message='No watch exists with the UUID of {}'.format(uuid)) if request.args.get('recheck'): - self.update_q.put((1, uuid)) + self.queue_single_watch(uuid, priority=1) return "OK", 200 # Return without history, get that via another API call @@ -86,7 +88,7 @@ class CreateWatch(Resource): def __init__(self, **kwargs): # datastore is a black box dependency self.datastore = kwargs['datastore'] - self.update_q = kwargs['update_q'] + self.queue_single_watch = kwargs['queue_single_watch'] @auth.check_token def post(self): @@ -100,7 +102,7 @@ class CreateWatch(Resource): extras = {'title': json_data['title'].strip()} if json_data.get('title') else {} new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras) - self.update_q.put((1, new_uuid)) + self.queue_single_watch(new_uuid, priority=1) return {'uuid': new_uuid}, 201 # Return concise list of available watches and some very basic info @@ -118,7 +120,7 @@ class CreateWatch(Resource): if request.args.get('recheck_all'): for uuid in self.datastore.data['watching'].keys(): - self.update_q.put((1, uuid)) + self.queue_single_watch(uuid, priority=1) return {'status': "OK"}, 200 return list, 200 diff --git a/changedetectionio/update_worker.py b/changedetectionio/update_worker.py index d56a9298..08e9bb38 100644 --- a/changedetectionio/update_worker.py +++ b/changedetectionio/update_worker.py @@ -1,8 +1,7 @@ +import logging import os -import threading -import queue import time - +logging.basicConfig(level=logging.DEBUG) from changedetectionio import content_fetcher from changedetectionio.html_tools import FilterNotFoundInResponse @@ -12,15 +11,12 @@ from changedetectionio.html_tools import FilterNotFoundInResponse # (another process inserts watches into the queue that are time-ready for checking) -class update_worker(threading.Thread): +class update_worker(): current_uuid = None - def __init__(self, q, notification_q, app, datastore, *args, **kwargs): - self.q = q - self.app = app + def __init__(self, notification_q, datastore): self.notification_q = notification_q self.datastore = datastore - super().__init__(*args, **kwargs) def send_content_changed_notification(self, t, watch_uuid): @@ -116,182 +112,168 @@ class update_worker(threading.Thread): if os.path.isfile(full_path): os.unlink(full_path) - def run(self): + def run(self, uuid): from changedetectionio import fetch_site_status update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) - while not self.app.config.exit.is_set(): + self.current_uuid = uuid - try: - priority, uuid = self.q.get(block=False) - except queue.Empty: - pass + if uuid in list(self.datastore.data['watching'].keys()): + changed_detected = False + contents = b'' + screenshot = False + update_obj= {} + xpath_data = False + process_changedetection_results = True + print("> Processing UUID {} Priority {} URL {}".format(uuid, 1, self.datastore.data['watching'][uuid]['url'])) + now = time.time() + try: + changed_detected, update_obj, contents = update_handler.run(uuid) + # Re #342 + # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. + # We then convert/.decode('utf-8') for the notification etc + if not isinstance(contents, (bytes, bytearray)): + raise Exception("Error - returned data from the fetch handler SHOULD be bytes") + except PermissionError as e: + logging.error("File permission error updating", uuid, str(e)) + process_changedetection_results = False + except content_fetcher.ReplyWithContentButNoText as e: + # Totally fine, it's by choice - just continue on, nothing more to care about + # Page had elements/content but no renderable text + # Backend (not filters) gave zero output + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)}) + if e.screenshot: + self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot) + process_changedetection_results = False + + except content_fetcher.Non200ErrorCodeReceived as e: + if e.status_code == 403: + err_text = "Error - 403 (Access denied) received" + elif e.status_code == 404: + err_text = "Error - 404 (Page not found) received" + elif e.status_code == 500: + err_text = "Error - 500 (Internal server Error) received" + else: + err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code)) + + if e.screenshot: + self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) + if e.xpath_data: + self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True) + if e.page_text: + self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text) + + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, + # So that we get a trigger when the content is added again + 'previous_md5': ''}) + process_changedetection_results = False + + except FilterNotFoundInResponse as e: + err_text = "Warning, filter '{}' not found".format(str(e)) + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, + # So that we get a trigger when the content is added again + 'previous_md5': ''}) + + # Only when enabled, send the notification + if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False): + c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) + c += 1 + # Send notification if we reached the threshold? + threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', + 0) + print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) + if threshold > 0 and c >= threshold: + if not self.datastore.data['watching'][uuid].get('notification_muted'): + self.send_filter_failure_notification(uuid) + c = 0 + + self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) + + process_changedetection_results = True + + except content_fetcher.EmptyReply as e: + # Some kind of custom to-str handler in the exception handler that does this? + err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, + 'last_check_status': e.status_code}) + except content_fetcher.ScreenshotUnavailable as e: + err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, + 'last_check_status': e.status_code}) + process_changedetection_results = False + except content_fetcher.JSActionExceptions as e: + err_text = "Error running JS Actions - Page request - "+e.message + if e.screenshot: + self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, + 'last_check_status': e.status_code}) + except content_fetcher.PageUnloadable as e: + err_text = "Page request from server didnt respond correctly" + if e.message: + err_text = "{} - {}".format(err_text, e.message) + + if e.screenshot: + self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) + + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, + 'last_check_status': e.status_code}) + except Exception as e: + logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) + # Other serious error + process_changedetection_results = False else: - self.current_uuid = uuid - - if uuid in list(self.datastore.data['watching'].keys()): - changed_detected = False - contents = b'' - screenshot = False - update_obj= {} - xpath_data = False - process_changedetection_results = True - print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url'])) - now = time.time() - - try: - changed_detected, update_obj, contents = update_handler.run(uuid) - # Re #342 - # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. - # We then convert/.decode('utf-8') for the notification etc - if not isinstance(contents, (bytes, bytearray)): - raise Exception("Error - returned data from the fetch handler SHOULD be bytes") - except PermissionError as e: - self.app.logger.error("File permission error updating", uuid, str(e)) - process_changedetection_results = False - except content_fetcher.ReplyWithContentButNoText as e: - # Totally fine, it's by choice - just continue on, nothing more to care about - # Page had elements/content but no renderable text - # Backend (not filters) gave zero output - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)}) - if e.screenshot: - self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot) - process_changedetection_results = False - - except content_fetcher.Non200ErrorCodeReceived as e: - if e.status_code == 403: - err_text = "Error - 403 (Access denied) received" - elif e.status_code == 404: - err_text = "Error - 404 (Page not found) received" - elif e.status_code == 500: - err_text = "Error - 500 (Internal server Error) received" - else: - err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code)) - - if e.screenshot: - self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) - if e.xpath_data: - self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True) - if e.page_text: - self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text) - - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - # So that we get a trigger when the content is added again - 'previous_md5': ''}) - process_changedetection_results = False - - except FilterNotFoundInResponse as e: - err_text = "Warning, filter '{}' not found".format(str(e)) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - # So that we get a trigger when the content is added again - 'previous_md5': ''}) - - # Only when enabled, send the notification - if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False): - c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) - c += 1 - # Send notification if we reached the threshold? - threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', - 0) - print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) - if threshold > 0 and c >= threshold: - if not self.datastore.data['watching'][uuid].get('notification_muted'): - self.send_filter_failure_notification(uuid) - c = 0 - - self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) - - process_changedetection_results = True - - except content_fetcher.EmptyReply as e: - # Some kind of custom to-str handler in the exception handler that does this? - err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - except content_fetcher.ScreenshotUnavailable as e: - err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - process_changedetection_results = False - except content_fetcher.JSActionExceptions as e: - err_text = "Error running JS Actions - Page request - "+e.message - if e.screenshot: - self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - except content_fetcher.PageUnloadable as e: - err_text = "Page request from server didnt respond correctly" - if e.message: - err_text = "{} - {}".format(err_text, e.message) - - if e.screenshot: - self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) - - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, - 'last_check_status': e.status_code}) - except Exception as e: - self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) - # Other serious error - process_changedetection_results = False - else: - # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) - if not self.datastore.data['watching'].get(uuid): - continue - - # Mark that we never had any failures - if not self.datastore.data['watching'][uuid].get('ignore_status_codes'): - update_obj['consecutive_filter_failures'] = 0 - - self.cleanup_error_artifacts(uuid) - - # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc - if process_changedetection_results: - try: - watch = self.datastore.data['watching'][uuid] - fname = "" # Saved history text filename - - # For the FIRST time we check a site, or a change detected, save the snapshot. - if changed_detected or not watch['last_checked']: - # A change was detected - watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) - - self.datastore.update_watch(uuid=uuid, update_obj=update_obj) - - # A change was detected - if changed_detected: - print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) - - # Notifications should only trigger on the second time (first time, we gather the initial snapshot) - if watch.history_n >= 2: - if not self.datastore.data['watching'][uuid].get('notification_muted'): - self.send_content_changed_notification(self, watch_uuid=uuid) - - - except Exception as e: - # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! - print("!!!! Exception in update_worker !!!\n", e) - self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) - self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) - - - # Always record that we atleast tried - self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), - 'last_checked': round(time.time())}) - - # Always save the screenshot if it's available - if update_handler.screenshot: - self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot) - if update_handler.xpath_data: - self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data) - - - self.current_uuid = None # Done - self.q.task_done() - - # Give the CPU time to interrupt - time.sleep(0.1) - - self.app.config.exit.wait(1) + # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) + if not self.datastore.data['watching'].get(uuid): + return + + # Mark that we never had any failures + if not self.datastore.data['watching'][uuid].get('ignore_status_codes'): + update_obj['consecutive_filter_failures'] = 0 + + self.cleanup_error_artifacts(uuid) + + # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc + if process_changedetection_results: + try: + watch = self.datastore.data['watching'][uuid] + fname = "" # Saved history text filename + + # For the FIRST time we check a site, or a change detected, save the snapshot. + if changed_detected or not watch['last_checked']: + # A change was detected + watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) + + self.datastore.update_watch(uuid=uuid, update_obj=update_obj) + + # A change was detected + if changed_detected: + print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) + + # Notifications should only trigger on the second time (first time, we gather the initial snapshot) + if watch.history_n >= 2: + if not self.datastore.data['watching'][uuid].get('notification_muted'): + self.send_content_changed_notification(self, watch_uuid=uuid) + + + except Exception as e: + # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! + print("!!!! Exception in update_worker !!!\n", e) + logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) + self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) + + + # Always record that we atleast tried + self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), + 'last_checked': round(time.time())}) + + # Always save the screenshot if it's available + if update_handler.screenshot: + self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot) + if update_handler.xpath_data: + self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data) + + + self.current_uuid = None \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 8aaef292..1d5e7405 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,6 +33,8 @@ bs4 # XPath filtering, lxml is required by bs4 anyway, but put it here to be safe. lxml +PriorityThreadPoolExecutor + # 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0 selenium ~= 4.1.0