From f1c2ece32f04952bc02d10d5b51388a6b0cf48b9 Mon Sep 17 00:00:00 2001 From: Leigh Morresi <275001+dgtlmoon@users.noreply.github.com> Date: Tue, 2 Feb 2021 16:29:06 +0100 Subject: [PATCH] Use a pool of thread workers, better for huge lists of watchers --- backend/backend.py | 94 +++++++++++++++++++++--------------- backend/fetch_site_status.py | 35 ++++---------- backend/store.py | 3 +- 3 files changed, 66 insertions(+), 66 deletions(-) diff --git a/backend/backend.py b/backend/backend.py index 8708da62..0406c1bf 100644 --- a/backend/backend.py +++ b/backend/backend.py @@ -25,19 +25,24 @@ import datetime import timeago import threading +import queue + from flask import Flask, render_template, request, send_file, send_from_directory, safe_join, abort, redirect, url_for + # Local import store import fetch_site_status - +running_update_threads = [] ticker_thread = None datastore = store.ChangeDetectionStore() messages = [] extra_stylesheets = [] -running_update_threads = {} + +update_q = queue.Queue() + app = Flask(__name__, static_url_path='/static') app.config['STATIC_RESOURCES'] = "/app/static" @@ -52,9 +57,9 @@ app.config['TEMPLATES_AUTO_RELOAD'] = True # running or something similar. @app.template_filter('format_last_checked_time') def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): - global running_update_threads - if watch_obj['uuid'] in running_update_threads: - if running_update_threads[watch_obj['uuid']].is_alive(): + # Worker thread tells us which UUID it is currently processing. + for t in running_update_threads: + if t.current_uuid == watch_obj['uuid']: return "Checking now.." if watch_obj['last_checked'] == 0: @@ -261,8 +266,8 @@ def selfcheck(): if not uuid in path: output = "Something weird in {}, suspected incorrect snapshot path.".format(uuid) - return output + @app.route("/static//", methods=['GET']) def static_content(group, filename): try: @@ -292,17 +297,12 @@ def api_delete(): return redirect(url_for('main_page')) - - @app.route("/api/checknow", methods=['GET']) def api_watch_checknow(): global messages uuid = request.args.get('uuid') - - running_update_threads[uuid] = fetch_site_status.perform_site_check(uuid=uuid, - datastore=datastore) - running_update_threads[uuid].start() + update_q.put(uuid) tag = request.args.get('tag') return redirect(url_for('main_page', tag=tag)) @@ -310,49 +310,64 @@ def api_watch_checknow(): @app.route("/api/recheckall", methods=['GET']) def api_watch_recheckall(): - import fetch_site_status - global running_update_threads - i = 0 for uuid, watch in datastore.data['watching'].items(): - i = i + 1 - - running_update_threads[watch['uuid']] = fetch_site_status.perform_site_check(uuid=uuid, - datastore=datastore) - running_update_threads[watch['uuid']].start() + update_q.put(uuid) - return "{} triggered recheck of {} watches.".format(i, len(datastore.data['watching'])) + return "Triggered recheck of {} watches.".format(len(datastore.data['watching'])) -# Can be used whenever, launch threads that need launching to update the stored information -def launch_checks(): - import fetch_site_status - global running_update_threads +# Requests for checking on the site use a pool of thread Workers managed by a Queue. +class Worker(threading.Thread): + current_uuid = None - minutes = datastore.data['settings']['requests']['minutes_between_check'] - for uuid, watch in datastore.data['watching'].items(): + def __init__(self, q, *args, **kwargs): + self.q = q + super().__init__(*args, **kwargs) -#@Todo https://pymotw.com/2/Queue/ - if watch['last_checked'] <= time.time() - (minutes * 60): - running_update_threads[watch['uuid']] = fetch_site_status.perform_site_check(uuid=uuid, - datastore=datastore) - running_update_threads[watch['uuid']].start() + def run(self): + try: + while True: + uuid = self.q.get() # Blocking + self.current_uuid = uuid + fetch_site_status.perform_site_check(uuid=uuid, datastore=datastore) + self.current_uuid = None # Done + self.q.task_done() + except KeyboardInterrupt: + return -# Thread runner to check every minute +# Thread runner to check every minute, look for new watches to feed into the Queue. def ticker_thread_check_time_launch_checks(): + + # Spin up Workers. + for _ in range(datastore.data['settings']['requests']['workers']): + new_worker = Worker(update_q) + running_update_threads.append(new_worker) + new_worker.start() + + # Every minute check for new UUIDs to follow up on while True: - launch_checks() + minutes = datastore.data['settings']['requests']['minutes_between_check'] + for uuid, watch in datastore.data['watching'].items(): + if watch['last_checked'] <= time.time() - (minutes * 60): + update_q.put(uuid) + time.sleep(60) + # Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON -# by just running periodically in one thread. +# by just running periodically in one thread, according to python, dict updates are threadsafe. def save_datastore(): - while True: - if datastore.needs_write: - datastore.sync_to_json() - time.sleep(5) + try: + while True: + if datastore.needs_write: + datastore.sync_to_json() + time.sleep(5) + + except KeyboardInterrupt: + return def main(argv): ssl_mode = False @@ -378,6 +393,7 @@ def main(argv): # @todo handle ctrl break ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() + save_data_thread = threading.Thread(target=save_datastore).start() # @todo finalise SSL config, but this should get you in the right direction if you need it. diff --git a/backend/fetch_site_status.py b/backend/fetch_site_status.py index 49598b11..8a0e35d2 100644 --- a/backend/fetch_site_status.py +++ b/backend/fetch_site_status.py @@ -1,16 +1,15 @@ -from threading import Thread import time import requests import hashlib import os import re -import html2text -# Not needed due to inscriptis being way better. -#from urlextract import URLExtract from inscriptis import get_text -# Hmm Polymorphism datastore, thread, etc -class perform_site_check(Thread): +# Doesn't feel right having 'datastore' as a var here, perhaps this class can inherit from datastore/abstract +# but on the other hand, I dont want a new instantiation of the that datastore object every time, due to it reading the +# JSON store, setting vars, writing etc. + +class perform_site_check(): def __init__(self, *args, uuid=False, datastore, **kwargs): super().__init__(*args, **kwargs) self.timestamp = int(time.time()) # used for storage etc too @@ -20,6 +19,9 @@ class perform_site_check(Thread): self.current_md5 = datastore.get_val(uuid, 'previous_md5') self.output_path = "/datastore/{}".format(self.uuid) + self.ensure_output_path() + self.run() + def save_firefox_screenshot(self, uuid, output): # @todo call selenium or whatever return @@ -59,10 +61,9 @@ class perform_site_check(Thread): if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']: request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '') -# print("Checking", self.url, request_headers) + print("Checking", self.url) - self.ensure_output_path() try: timeout = self.datastore.data['settings']['requests']['timeout'] @@ -78,24 +79,6 @@ class perform_site_check(Thread): stripped_text_from_html = get_text(r.text) - - # @todo This should be a config option. - # Many websites include junk in the links, trackers, etc.. Since we are really a service all about text changes.. - -# inscriptis handles this much cleaner, probably not needed.. -# extractor = URLExtract() -# urls = extractor.find_urls(stripped_text_from_html) - # Remove the urls, longest first so that we dont end up chewing up bigger links with parts of smaller ones. -# if urls: -# urls.sort(key=len, reverse=True) -# for url in urls: -# # Sometimes URLExtract will consider something like 'foobar.com' as a link when that was just text. -# if "://" in url: -# # print ("Stripping link", url) -# stripped_text_from_html = stripped_text_from_html.replace(url, '') - - - # Usually from networkIO/requests level except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e: self.datastore.update_watch(self.uuid, 'last_error', str(e)) diff --git a/backend/store.py b/backend/store.py index 31a7447b..c8cc5618 100644 --- a/backend/store.py +++ b/backend/store.py @@ -23,7 +23,8 @@ class ChangeDetectionStore: }, 'requests': { 'timeout': 15, # Default 15 seconds - 'minutes_between_check': 3 * 60 # Default 3 hours + 'minutes_between_check': 3 * 60, # Default 3 hours + 'workers': 10 # Number of threads, lower is better for slow connections } } }