Use a pool of thread workers, better for huge lists of watchers

pull/11/head
Leigh Morresi 4 years ago
parent 704b8daa6d
commit f1c2ece32f

@ -25,19 +25,24 @@ import datetime
import timeago
import threading
import queue
from flask import Flask, render_template, request, send_file, send_from_directory, safe_join, abort, redirect, url_for
# Local
import store
import fetch_site_status
running_update_threads = []
ticker_thread = None
datastore = store.ChangeDetectionStore()
messages = []
extra_stylesheets = []
running_update_threads = {}
update_q = queue.Queue()
app = Flask(__name__, static_url_path='/static')
app.config['STATIC_RESOURCES'] = "/app/static"
@ -52,9 +57,9 @@ app.config['TEMPLATES_AUTO_RELOAD'] = True
# running or something similar.
@app.template_filter('format_last_checked_time')
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
global running_update_threads
if watch_obj['uuid'] in running_update_threads:
if running_update_threads[watch_obj['uuid']].is_alive():
# Worker thread tells us which UUID it is currently processing.
for t in running_update_threads:
if t.current_uuid == watch_obj['uuid']:
return "Checking now.."
if watch_obj['last_checked'] == 0:
@ -261,8 +266,8 @@ def selfcheck():
if not uuid in path:
output = "Something weird in {}, suspected incorrect snapshot path.".format(uuid)
return output
@app.route("/static/<string:group>/<string:filename>", methods=['GET'])
def static_content(group, filename):
try:
@ -292,17 +297,12 @@ def api_delete():
return redirect(url_for('main_page'))
@app.route("/api/checknow", methods=['GET'])
def api_watch_checknow():
global messages
uuid = request.args.get('uuid')
running_update_threads[uuid] = fetch_site_status.perform_site_check(uuid=uuid,
datastore=datastore)
running_update_threads[uuid].start()
update_q.put(uuid)
tag = request.args.get('tag')
return redirect(url_for('main_page', tag=tag))
@ -310,50 +310,65 @@ def api_watch_checknow():
@app.route("/api/recheckall", methods=['GET'])
def api_watch_recheckall():
import fetch_site_status
global running_update_threads
i = 0
for uuid, watch in datastore.data['watching'].items():
i = i + 1
update_q.put(uuid)
running_update_threads[watch['uuid']] = fetch_site_status.perform_site_check(uuid=uuid,
datastore=datastore)
running_update_threads[watch['uuid']].start()
return "Triggered recheck of {} watches.".format(len(datastore.data['watching']))
return "{} triggered recheck of {} watches.".format(i, len(datastore.data['watching']))
# Requests for checking on the site use a pool of thread Workers managed by a Queue.
class Worker(threading.Thread):
# Can be used whenever, launch threads that need launching to update the stored information
def launch_checks():
import fetch_site_status
global running_update_threads
current_uuid = None
def __init__(self, q, *args, **kwargs):
self.q = q
super().__init__(*args, **kwargs)
minutes = datastore.data['settings']['requests']['minutes_between_check']
for uuid, watch in datastore.data['watching'].items():
#@Todo https://pymotw.com/2/Queue/
if watch['last_checked'] <= time.time() - (minutes * 60):
running_update_threads[watch['uuid']] = fetch_site_status.perform_site_check(uuid=uuid,
datastore=datastore)
running_update_threads[watch['uuid']].start()
def run(self):
try:
while True:
uuid = self.q.get() # Blocking
self.current_uuid = uuid
fetch_site_status.perform_site_check(uuid=uuid, datastore=datastore)
self.current_uuid = None # Done
self.q.task_done()
except KeyboardInterrupt:
return
# Thread runner to check every minute
# Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks():
# Spin up Workers.
for _ in range(datastore.data['settings']['requests']['workers']):
new_worker = Worker(update_q)
running_update_threads.append(new_worker)
new_worker.start()
# Every minute check for new UUIDs to follow up on
while True:
launch_checks()
minutes = datastore.data['settings']['requests']['minutes_between_check']
for uuid, watch in datastore.data['watching'].items():
if watch['last_checked'] <= time.time() - (minutes * 60):
update_q.put(uuid)
time.sleep(60)
# Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON
# by just running periodically in one thread.
# by just running periodically in one thread, according to python, dict updates are threadsafe.
def save_datastore():
try:
while True:
if datastore.needs_write:
datastore.sync_to_json()
time.sleep(5)
except KeyboardInterrupt:
return
def main(argv):
ssl_mode = False
port = 5000
@ -378,6 +393,7 @@ def main(argv):
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
save_data_thread = threading.Thread(target=save_datastore).start()
# @todo finalise SSL config, but this should get you in the right direction if you need it.

@ -1,16 +1,15 @@
from threading import Thread
import time
import requests
import hashlib
import os
import re
import html2text
# Not needed due to inscriptis being way better.
#from urlextract import URLExtract
from inscriptis import get_text
# Hmm Polymorphism datastore, thread, etc
class perform_site_check(Thread):
# Doesn't feel right having 'datastore' as a var here, perhaps this class can inherit from datastore/abstract
# but on the other hand, I dont want a new instantiation of the that datastore object every time, due to it reading the
# JSON store, setting vars, writing etc.
class perform_site_check():
def __init__(self, *args, uuid=False, datastore, **kwargs):
super().__init__(*args, **kwargs)
self.timestamp = int(time.time()) # used for storage etc too
@ -20,6 +19,9 @@ class perform_site_check(Thread):
self.current_md5 = datastore.get_val(uuid, 'previous_md5')
self.output_path = "/datastore/{}".format(self.uuid)
self.ensure_output_path()
self.run()
def save_firefox_screenshot(self, uuid, output):
# @todo call selenium or whatever
return
@ -59,10 +61,9 @@ class perform_site_check(Thread):
if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']:
request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '')
# print("Checking", self.url, request_headers)
print("Checking", self.url)
self.ensure_output_path()
try:
timeout = self.datastore.data['settings']['requests']['timeout']
@ -78,24 +79,6 @@ class perform_site_check(Thread):
stripped_text_from_html = get_text(r.text)
# @todo This should be a config option.
# Many websites include junk in the links, trackers, etc.. Since we are really a service all about text changes..
# inscriptis handles this much cleaner, probably not needed..
# extractor = URLExtract()
# urls = extractor.find_urls(stripped_text_from_html)
# Remove the urls, longest first so that we dont end up chewing up bigger links with parts of smaller ones.
# if urls:
# urls.sort(key=len, reverse=True)
# for url in urls:
# # Sometimes URLExtract will consider something like 'foobar.com' as a link when that was just text.
# if "://" in url:
# # print ("Stripping link", url)
# stripped_text_from_html = stripped_text_from_html.replace(url, '')
# Usually from networkIO/requests level
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
self.datastore.update_watch(self.uuid, 'last_error', str(e))

@ -23,7 +23,8 @@ class ChangeDetectionStore:
},
'requests': {
'timeout': 15, # Default 15 seconds
'minutes_between_check': 3 * 60 # Default 3 hours
'minutes_between_check': 3 * 60, # Default 3 hours
'workers': 10 # Number of threads, lower is better for slow connections
}
}
}

Loading…
Cancel
Save