import importlib import os import re import threading import queue import time from distutils.util import strtobool from changedetectionio import content_fetcher, html_tools from .processors.text_json_diff import FilterNotFoundInResponse from .processors.restock_diff import UnableToExtractRestockData # A single update worker # # Requests for checking on a single site(watch) from a queue of watches # (another process inserts watches into the queue that are time-ready for checking) import logging import sys class update_worker(threading.Thread): current_uuid = None def __init__(self, q, notification_q, app, datastore, *args, **kwargs): logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) self.q = q self.app = app self.notification_q = notification_q self.datastore = datastore super().__init__(*args, **kwargs) def queue_notification_for_watch(self, n_object, watch): from changedetectionio import diff watch_history = watch.history dates = list(watch_history.keys()) # Add text that was triggered snapshot_contents = watch.get_history_snapshot(dates[-1]) # HTML needs linebreak, but MarkDown and Text can use a linefeed if n_object['notification_format'] == 'HTML': line_feed_sep = "
" # Snapshot will be plaintext on the disk, convert to some kind of HTML snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) else: line_feed_sep = "\n" trigger_text = watch.get('trigger_text', []) triggered_text = '' if len(trigger_text): from . import html_tools triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text) if triggered_text: triggered_text = line_feed_sep.join(triggered_text) n_object.update({ 'current_snapshot': snapshot_contents, 'diff': diff.render_diff(watch.get_history_snapshot(dates[-2]), watch.get_history_snapshot(dates[-1]), line_feed_sep=line_feed_sep), 'diff_added': diff.render_diff(watch.get_history_snapshot(dates[-2]), watch.get_history_snapshot(dates[-1]), include_removed=False, line_feed_sep=line_feed_sep), 'diff_full': diff.render_diff(watch.get_history_snapshot(dates[-2]), watch.get_history_snapshot(dates[-1]), include_equal=True, line_feed_sep=line_feed_sep), 'diff_patch': diff.render_diff(watch.get_history_snapshot(dates[-2]), watch.get_history_snapshot(dates[-1]), line_feed_sep=line_feed_sep, patch_format=True), 'diff_removed': diff.render_diff(watch.get_history_snapshot(dates[-2]), watch.get_history_snapshot(dates[-1]), include_added=False, line_feed_sep=line_feed_sep), 'screenshot': watch.get_screenshot() if watch.get('notification_screenshot') else None, 'triggered_text': triggered_text, 'uuid': watch.get('uuid'), 'watch_url': watch.get('url'), }) logging.info (">> SENDING NOTIFICATION") self.notification_q.put(n_object) # Prefer - Individual watch settings > Tag settings > Global settings (in that order) def _check_cascading_vars(self, var_name, watch): from changedetectionio.notification import ( default_notification_format_for_watch, default_notification_body, default_notification_title ) # Would be better if this was some kind of Object where Watch can reference the parent datastore etc v = watch.get(var_name) if v and not watch.get('notification_muted'): if var_name == 'notification_format' and v == default_notification_format_for_watch: return self.datastore.data['settings']['application'].get('notification_format') return v tags = self.datastore.get_all_tags_for_watch(uuid=watch.get('uuid')) if tags: for tag_uuid, tag in tags.items(): v = tag.get(var_name) if v and not tag.get('notification_muted'): return v if self.datastore.data['settings']['application'].get(var_name): return self.datastore.data['settings']['application'].get(var_name) # Otherwise could be defaults if var_name == 'notification_format': return default_notification_format_for_watch if var_name == 'notification_body': return default_notification_body if var_name == 'notification_title': return default_notification_title return None def send_content_changed_notification(self, watch_uuid): n_object = {} watch = self.datastore.data['watching'].get(watch_uuid) if not watch: return watch_history = watch.history dates = list(watch_history.keys()) # Theoretically it's possible that this could be just 1 long, # - In the case that the timestamp key was not unique if len(dates) == 1: raise ValueError( "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?" ) # Should be a better parent getter in the model object # Prefer - Individual watch settings > Tag settings > Global settings (in that order) n_object['notification_urls'] = self._check_cascading_vars('notification_urls', watch) n_object['notification_title'] = self._check_cascading_vars('notification_title', watch) n_object['notification_body'] = self._check_cascading_vars('notification_body', watch) n_object['notification_format'] = self._check_cascading_vars('notification_format', watch) # (Individual watch) Only prepare to notify if the rules above matched queued = False if n_object and n_object.get('notification_urls'): queued = True self.queue_notification_for_watch(n_object, watch) return queued def send_filter_failure_notification(self, watch_uuid): threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') watch = self.datastore.data['watching'].get(watch_uuid) if not watch: return n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', 'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format( ", ".join(watch['include_filters']), threshold), 'notification_format': 'text'} if len(watch['notification_urls']): n_object['notification_urls'] = watch['notification_urls'] elif len(self.datastore.data['settings']['application']['notification_urls']): n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] # Only prepare to notify if the rules above matched if 'notification_urls' in n_object: n_object.update({ 'watch_url': watch['url'], 'uuid': watch_uuid, 'screenshot': None }) self.notification_q.put(n_object) print("Sent filter not found notification for {}".format(watch_uuid)) def send_step_failure_notification(self, watch_uuid, step_n): watch = self.datastore.data['watching'].get(watch_uuid, False) if not watch: return threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1), 'notification_body': "Your configured browser step at position {} for {{watch['url']}} " "did not appear on the page after {} attempts, did the page change layout? " "Does it need a delay added?\n\nLink: {{base_url}}/edit/{{watch_uuid}}\n\n" "Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold), 'notification_format': 'text'} if len(watch['notification_urls']): n_object['notification_urls'] = watch['notification_urls'] elif len(self.datastore.data['settings']['application']['notification_urls']): n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] # Only prepare to notify if the rules above matched if 'notification_urls' in n_object: n_object.update({ 'watch_url': watch['url'], 'uuid': watch_uuid }) self.notification_q.put(n_object) print("Sent step not found notification for {}".format(watch_uuid)) def cleanup_error_artifacts(self, uuid): # All went fine, remove error artifacts cleanup_files = ["last-error-screenshot.png", "last-error.txt"] for f in cleanup_files: full_path = os.path.join(self.datastore.datastore_path, uuid, f) if os.path.isfile(full_path): os.unlink(full_path) def run(self): from .processors import text_json_diff, restock_diff while not self.app.config.exit.is_set(): change_processor = None try: queued_item_data = self.q.get(block=False) except queue.Empty: pass else: uuid = queued_item_data.item.get('uuid') self.current_uuid = uuid if uuid in list(self.datastore.data['watching'].keys()) and self.datastore.data['watching'][uuid].get('url'): changed_detected = False contents = b'' process_changedetection_results = True update_obj = {} print("> Processing UUID {} Priority {} URL {}".format(uuid, queued_item_data.priority, self.datastore.data['watching'][uuid]['url'])) now = time.time() try: # Protect against file:// access if re.search(r'^file://', self.datastore.data['watching'][uuid].get('url', '').strip(), re.IGNORECASE): if not strtobool(os.getenv('ALLOW_FILE_URI', 'false')): raise Exception( "file:// type access is denied for security reasons." ) prefer_fetch_backend = self.datastore.data['watching'][uuid].get('fetch_backend', 'system') if not prefer_fetch_backend or prefer_fetch_backend == 'system': prefer_fetch_backend = self.datastore.data['settings']['application'].get('fetch_backend') processor = self.datastore.data['watching'][uuid].get('processor', 'text_json_diff') processor = 'cdio_whois_diff' if processor in ['text_json_diff', 'restock_diff']: base_processor_module = f"changedetectionio.processors.{processor}" else: # Each plugin is one processor exactly base_processor_module = f"{processor}.processor" # its correct that processor dictates which fethcer it uses i think # these should inherit the right fetcher too module = importlib.import_module(base_processor_module) change_processor = getattr(module, 'perform_site_check') change_processor = change_processor(datastore=self.datastore, watch_uuid=uuid, prefer_fetch_backend=prefer_fetch_backend ) # Clear last errors (move to preflight func?) self.datastore.data['watching'][uuid]['browser_steps_last_error_step'] = None skip_when_same_checksum = queued_item_data.item.get('skip_when_checksum_same') # Each processor extends base class of the kind of fetcher it needs to run anyway change_processor.fetch_content() changed_detected, update_obj, contents = change_processor.run_changedetection(uuid, skip_when_checksum_same=skip_when_same_checksum ) # Re #342 # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. # We then convert/.decode('utf-8') for the notification etc if not isinstance(contents, (bytes, bytearray)): raise Exception("Error - returned data from the fetch handler SHOULD be bytes") except PermissionError as e: self.app.logger.error("File permission error updating", uuid, str(e)) process_changedetection_results = False except content_fetcher.ReplyWithContentButNoText as e: # Totally fine, it's by choice - just continue on, nothing more to care about # Page had elements/content but no renderable text # Backend (not filters) gave zero output extra_help = "" if e.has_filters: # Maybe it contains an image? offer a more helpful link has_img = html_tools.include_filters(include_filters='img', html_content=e.html_content) if has_img: extra_help = ", it's possible that the filters you have give an empty result or contain only an image." else: extra_help = ", it's possible that the filters were found, but contained no usable text." self.datastore.update_watch(uuid=uuid, update_obj={ 'last_error': f"Got HTML content but no text found (With {e.status_code} reply code){extra_help}" }) if e.screenshot: self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot) process_changedetection_results = False except content_fetcher.Non200ErrorCodeReceived as e: if e.status_code == 403: err_text = "Error - 403 (Access denied) received" elif e.status_code == 404: err_text = "Error - 404 (Page not found) received" elif e.status_code == 500: err_text = "Error - 500 (Internal server Error) received" else: err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code)) if e.screenshot: self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) if e.xpath_data: self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True) if e.page_text: self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) process_changedetection_results = False except FilterNotFoundInResponse as e: if not self.datastore.data['watching'].get(uuid): continue err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary." self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) # Only when enabled, send the notification if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False): c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) c += 1 # Send notification if we reached the threshold? threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) if threshold > 0 and c >= threshold: if not self.datastore.data['watching'][uuid].get('notification_muted'): self.send_filter_failure_notification(uuid) c = 0 self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) process_changedetection_results = False except content_fetcher.checksumFromPreviousCheckWasTheSame as e: # Yes fine, so nothing todo, don't continue to process. process_changedetection_results = False changed_detected = False self.datastore.update_watch(uuid=uuid, update_obj={'last_error': False}) except content_fetcher.BrowserStepsStepTimout as e: if not self.datastore.data['watching'].get(uuid): continue error_step = e.step_n + 1 err_text = f"Warning, browser step at position {error_step} could not run, target not found, check the watch, add a delay if necessary, view Browser Steps to see screenshot at that step" self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, 'browser_steps_last_error_step': error_step } ) if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False): c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) c += 1 # Send notification if we reached the threshold? threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) print("Step for {} not found, consecutive_filter_failures: {}".format(uuid, c)) if threshold > 0 and c >= threshold: if not self.datastore.data['watching'][uuid].get('notification_muted'): self.send_step_failure_notification(watch_uuid=uuid, step_n=e.step_n) c = 0 self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) process_changedetection_results = False except content_fetcher.EmptyReply as e: # Some kind of custom to-str handler in the exception handler that does this? err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, 'last_check_status': e.status_code}) process_changedetection_results = False except content_fetcher.ScreenshotUnavailable as e: err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, 'last_check_status': e.status_code}) process_changedetection_results = False except content_fetcher.JSActionExceptions as e: err_text = "Error running JS Actions - Page request - "+e.message if e.screenshot: self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, 'last_check_status': e.status_code}) process_changedetection_results = False except content_fetcher.PageUnloadable as e: err_text = "Page request from server didnt respond correctly" if e.message: err_text = "{} - {}".format(err_text, e.message) if e.screenshot: self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, 'last_check_status': e.status_code, 'has_ldjson_price_data': None}) process_changedetection_results = False except UnableToExtractRestockData as e: # Usually when fetcher.instock_data returns empty self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Unable to extract restock data for this page unfortunately. (Got code {e.status_code} from server)"}) process_changedetection_results = False except Exception as e: self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) # Other serious error process_changedetection_results = False # import traceback # print(traceback.format_exc()) else: # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) if not self.datastore.data['watching'].get(uuid): continue # Mark that we never had any failures if not self.datastore.data['watching'][uuid].get('ignore_status_codes'): update_obj['consecutive_filter_failures'] = 0 # Everything ran OK, clean off any previous error update_obj['last_error'] = False self.cleanup_error_artifacts(uuid) # # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc if process_changedetection_results: try: watch = self.datastore.data['watching'].get(uuid) self.datastore.update_watch(uuid=uuid, update_obj=update_obj) # Also save the snapshot on the first time checked if changed_detected or not watch['last_checked']: watch.save_history_text(contents=contents, timestamp=str(round(time.time())), snapshot_id=update_obj.get('previous_md5', 'none')) # A change was detected if changed_detected: print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) # Notifications should only trigger on the second time (first time, we gather the initial snapshot) if watch.history_n >= 2: if not self.datastore.data['watching'][uuid].get('notification_muted'): self.send_content_changed_notification(watch_uuid=uuid) except Exception as e: # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! print("!!!! Exception in update_worker !!!\n", e) self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) if self.datastore.data['watching'].get(uuid): # Always record that we atleast tried count = self.datastore.data['watching'][uuid].get('check_count', 0) + 1 self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), 'last_checked': round(time.time()), 'check_count': count }) # Always save the screenshot if it's available if change_processor.screenshot: self.datastore.save_screenshot(watch_uuid=uuid, screenshot=change_processor.screenshot) if change_processor.xpath_data: self.datastore.save_xpath_data(watch_uuid=uuid, data=change_processor.xpath_data) self.current_uuid = None # Done self.q.task_done() # Give the CPU time to interrupt time.sleep(0.1) self.app.config.exit.wait(1)