pull/888/head
dgtlmoon 2 years ago
parent f3c7c969d8
commit c3c0f62662

@ -18,6 +18,7 @@ import threading
import time import time
from copy import deepcopy from copy import deepcopy
from threading import Event from threading import Event
from PriorityThreadPoolExecutor import PriorityThreadPoolExecutor
import flask_login import flask_login
import logging import logging
@ -49,12 +50,12 @@ __version__ = '0.39.18'
datastore = None datastore = None
# Local # Local
running_update_threads = [] running_update_uuids = set()
ticker_thread = None ticker_thread = None
extra_stylesheets = [] extra_stylesheets = []
update_q = queue.PriorityQueue() pool = None
notification_q = queue.Queue() notification_q = queue.Queue()
@ -105,9 +106,8 @@ def init_app_secret(datastore_path):
# running or something similar. # running or something similar.
@app.template_filter('format_last_checked_time') @app.template_filter('format_last_checked_time')
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
# Worker thread tells us which UUID it is currently processing.
for t in running_update_threads: if watch_obj['uuid'] in running_update_uuids:
if t.current_uuid == watch_obj['uuid']:
return '<span class="loader"></span><span> Checking now</span>' return '<span class="loader"></span><span> Checking now</span>'
if watch_obj['last_checked'] == 0: if watch_obj['last_checked'] == 0:
@ -178,13 +178,15 @@ class User(flask_login.UserMixin):
def changedetection_app(config=None, datastore_o=None): def changedetection_app(config=None, datastore_o=None):
global datastore global datastore
global pool
datastore = datastore_o datastore = datastore_o
# so far just for read-only via tests, but this will be moved eventually to be the main source # so far just for read-only via tests, but this will be moved eventually to be the main source
# (instead of the global var) # (instead of the global var)
app.config['DATASTORE']=datastore_o app.config['DATASTORE']=datastore_o
#app.config.update(config or {}) pool = PriorityThreadPoolExecutor(max_workers=int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])))
login_manager = flask_login.LoginManager(app) login_manager = flask_login.LoginManager(app)
login_manager.login_view = 'login' login_manager.login_view = 'login'
@ -193,20 +195,17 @@ def changedetection_app(config=None, datastore_o=None):
watch_api.add_resource(api_v1.WatchSingleHistory, watch_api.add_resource(api_v1.WatchSingleHistory,
'/api/v1/watch/<string:uuid>/history/<string:timestamp>', '/api/v1/watch/<string:uuid>/history/<string:timestamp>',
resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
watch_api.add_resource(api_v1.WatchHistory, watch_api.add_resource(api_v1.WatchHistory,
'/api/v1/watch/<string:uuid>/history', '/api/v1/watch/<string:uuid>/history',
resource_class_kwargs={'datastore': datastore}) resource_class_kwargs={'datastore': datastore})
watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch', watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch',
resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>', watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>',
resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
# Setup cors headers to allow all domains # Setup cors headers to allow all domains
@ -417,8 +416,7 @@ def changedetection_app(config=None, datastore_o=None):
# Don't link to hosting when we're on the hosting environment # Don't link to hosting when we're on the hosting environment
hosted_sticky=os.getenv("SALTED_PASS", False) == False, hosted_sticky=os.getenv("SALTED_PASS", False) == False,
guid=datastore.data['app_guid'], guid=datastore.data['app_guid'],
queued_uuids=[uuid for p,uuid in update_q.queue]) queued_uuids=get_uuids_in_queue())
if session.get('share-link'): if session.get('share-link'):
del(session['share-link']) del(session['share-link'])
@ -632,7 +630,7 @@ def changedetection_app(config=None, datastore_o=None):
datastore.needs_write_urgent = True datastore.needs_write_urgent = True
# Queue the watch for immediate recheck, with a higher priority # Queue the watch for immediate recheck, with a higher priority
update_q.put((1, uuid)) queue_single_watch(uuid=uuid, priority=1)
# Diff page [edit] link should go back to diff page # Diff page [edit] link should go back to diff page
if request.args.get("next") and request.args.get("next") == 'diff': if request.args.get("next") and request.args.get("next") == 'diff':
@ -749,7 +747,7 @@ def changedetection_app(config=None, datastore_o=None):
importer = import_url_list() importer = import_url_list()
importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore) importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore)
for uuid in importer.new_uuids: for uuid in importer.new_uuids:
update_q.put((1, uuid)) queue_single_watch(uuid=uuid, priority=1)
if len(importer.remaining_data) == 0: if len(importer.remaining_data) == 0:
return redirect(url_for('index')) return redirect(url_for('index'))
@ -762,7 +760,7 @@ def changedetection_app(config=None, datastore_o=None):
d_importer = import_distill_io_json() d_importer = import_distill_io_json()
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
for uuid in d_importer.new_uuids: for uuid in d_importer.new_uuids:
update_q.put((1, uuid)) queue_single_watch(uuid=uuid, priority=1)
@ -1107,7 +1105,7 @@ def changedetection_app(config=None, datastore_o=None):
if not add_paused and new_uuid: if not add_paused and new_uuid:
# Straight into the queue. # Straight into the queue.
update_q.put((1, new_uuid)) queue_single_watch(uuid=uuid, priority=1)
flash("Watch added.") flash("Watch added.")
if add_paused: if add_paused:
@ -1144,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None):
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
new_uuid = datastore.clone(uuid) new_uuid = datastore.clone(uuid)
update_q.put((5, new_uuid)) queue_single_watch(uuid=uuid, priority=5)
flash('Cloned.') flash('Cloned.')
return redirect(url_for('index')) return redirect(url_for('index'))
@ -1157,31 +1155,25 @@ def changedetection_app(config=None, datastore_o=None):
uuid = request.args.get('uuid') uuid = request.args.get('uuid')
i = 0 i = 0
running_uuids = []
for t in running_update_threads:
running_uuids.append(t.current_uuid)
# @todo check thread is running and skip
if uuid: if uuid:
if uuid not in running_uuids: if uuid not in get_uuids_in_queue():
update_q.put((1, uuid)) queue_single_watch(uuid=uuid, priority=1)
i = 1 i = 1
elif tag != None: elif tag != None:
# Items that have this current tag # Items that have this current tag
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if (tag != None and tag in watch['tag']): if (tag != None and tag in watch['tag']):
if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']:
update_q.put((1, watch_uuid)) queue_single_watch(uuid=watch_uuid, priority=1)
i += 1 i += 1
else: else:
# No tag, no uuid, add everything. # No tag, no uuid, add everything.
for watch_uuid, watch in datastore.data['watching'].items(): for watch_uuid, watch in datastore.data['watching'].items():
if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']:
update_q.put((1, watch_uuid)) queue_single_watch(uuid=watch_uuid, priority=1)
i += 1 i += 1
flash("{} watches are queued for rechecking.".format(i)) flash("{} watches are queued for rechecking.".format(i))
return redirect(url_for('index', tag=tag)) return redirect(url_for('index', tag=tag))
@ -1346,33 +1338,31 @@ def notification_runner():
# Trim the log length # Trim the log length
notification_debug_log = notification_debug_log[-100:] notification_debug_log = notification_debug_log[-100:]
# Thread runner to check every minute, look for new watches to feed into the Queue. def queue_single_watch(uuid, priority=1):
pool.submit(process_single_watch, uuid, priority=int(time.time()) - priority)
def process_single_watch(uuid):
running_update_uuids.add(uuid)
from changedetectionio import update_worker
worker = update_worker.update_worker(notification_q=notification_q, datastore=datastore)
worker.run(uuid)
running_update_uuids.remove(uuid)
def get_uuids_in_queue():
return [workitem.args[0] for p, workitem in pool._work_queue.queue]
# Thread runner to load watch jobs into the queue as they become ready/due for checking again
def ticker_thread_check_time_launch_checks(): def ticker_thread_check_time_launch_checks():
import random import random
from changedetectionio import update_worker
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
# Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
while not app.config.exit.is_set(): while not app.config.exit.is_set():
# Get a list of watches by UUID that are currently fetching data
running_uuids = []
for t in running_update_threads:
if t.current_uuid:
running_uuids.append(t.current_uuid)
# Re #232 - Deepcopy the data incase it changes while we're iterating through it all # Re #232 - Deepcopy the data incase it changes while we're iterating through it all
watch_uuid_list = [] watch_uuid_list = []
while True: while not app.config.exit.is_set():
try: try:
watch_uuid_list = datastore.data['watching'].keys() watch_uuid_list = datastore.data['watching'].keys()
except RuntimeError as e: except RuntimeError as e:
@ -1382,7 +1372,8 @@ def ticker_thread_check_time_launch_checks():
break break
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large # Re #438 - Don't place more watches in the queue to be checked if the queue is already large
while update_q.qsize() >= 2000: while pool._work_queue.qsize() >= 2000:
if not app.config.exit.is_set():
time.sleep(1) time.sleep(1)
@ -1414,7 +1405,8 @@ def ticker_thread_check_time_launch_checks():
seconds_since_last_recheck = now - watch['last_checked'] seconds_since_last_recheck = now - watch['last_checked']
if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds: if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds:
if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]: #@todo check 'not in running_uuids'
if not uuid and uuid not in get_uuids_in_queue():
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it. # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
priority = int(time.time()) priority = int(time.time())
print( print(
@ -1425,8 +1417,8 @@ def ticker_thread_check_time_launch_checks():
priority, priority,
watch.jitter_seconds, watch.jitter_seconds,
now - watch['last_checked'])) now - watch['last_checked']))
# Into the queue with you
update_q.put((priority, uuid)) queue_single_watch(uuid=uuid, priority=priority)
# Reset for next time # Reset for next time
watch.jitter_seconds = 0 watch.jitter_seconds = 0

@ -1,6 +1,8 @@
from flask_restful import abort, Resource from flask_restful import abort, Resource
from flask import request, make_response from flask import request, make_response
import validators import validators
from . import auth from . import auth
@ -11,7 +13,7 @@ class Watch(Resource):
def __init__(self, **kwargs): def __init__(self, **kwargs):
# datastore is a black box dependency # datastore is a black box dependency
self.datastore = kwargs['datastore'] self.datastore = kwargs['datastore']
self.update_q = kwargs['update_q'] self.queue_single_watch = kwargs['queue_single_watch']
# Get information about a single watch, excluding the history list (can be large) # Get information about a single watch, excluding the history list (can be large)
# curl http://localhost:4000/api/v1/watch/<string:uuid> # curl http://localhost:4000/api/v1/watch/<string:uuid>
@ -24,7 +26,7 @@ class Watch(Resource):
abort(404, message='No watch exists with the UUID of {}'.format(uuid)) abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if request.args.get('recheck'): if request.args.get('recheck'):
self.update_q.put((1, uuid)) self.queue_single_watch(uuid, priority=1)
return "OK", 200 return "OK", 200
# Return without history, get that via another API call # Return without history, get that via another API call
@ -86,7 +88,7 @@ class CreateWatch(Resource):
def __init__(self, **kwargs): def __init__(self, **kwargs):
# datastore is a black box dependency # datastore is a black box dependency
self.datastore = kwargs['datastore'] self.datastore = kwargs['datastore']
self.update_q = kwargs['update_q'] self.queue_single_watch = kwargs['queue_single_watch']
@auth.check_token @auth.check_token
def post(self): def post(self):
@ -100,7 +102,7 @@ class CreateWatch(Resource):
extras = {'title': json_data['title'].strip()} if json_data.get('title') else {} extras = {'title': json_data['title'].strip()} if json_data.get('title') else {}
new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras) new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras)
self.update_q.put((1, new_uuid)) self.queue_single_watch(new_uuid, priority=1)
return {'uuid': new_uuid}, 201 return {'uuid': new_uuid}, 201
# Return concise list of available watches and some very basic info # Return concise list of available watches and some very basic info
@ -118,7 +120,7 @@ class CreateWatch(Resource):
if request.args.get('recheck_all'): if request.args.get('recheck_all'):
for uuid in self.datastore.data['watching'].keys(): for uuid in self.datastore.data['watching'].keys():
self.update_q.put((1, uuid)) self.queue_single_watch(uuid, priority=1)
return {'status': "OK"}, 200 return {'status': "OK"}, 200
return list, 200 return list, 200

@ -1,8 +1,7 @@
import logging
import os import os
import threading
import queue
import time import time
logging.basicConfig(level=logging.DEBUG)
from changedetectionio import content_fetcher from changedetectionio import content_fetcher
from changedetectionio.html_tools import FilterNotFoundInResponse from changedetectionio.html_tools import FilterNotFoundInResponse
@ -12,15 +11,12 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
# (another process inserts watches into the queue that are time-ready for checking) # (another process inserts watches into the queue that are time-ready for checking)
class update_worker(threading.Thread): class update_worker():
current_uuid = None current_uuid = None
def __init__(self, q, notification_q, app, datastore, *args, **kwargs): def __init__(self, notification_q, datastore):
self.q = q
self.app = app
self.notification_q = notification_q self.notification_q = notification_q
self.datastore = datastore self.datastore = datastore
super().__init__(*args, **kwargs)
def send_content_changed_notification(self, t, watch_uuid): def send_content_changed_notification(self, t, watch_uuid):
@ -116,19 +112,11 @@ class update_worker(threading.Thread):
if os.path.isfile(full_path): if os.path.isfile(full_path):
os.unlink(full_path) os.unlink(full_path)
def run(self): def run(self, uuid):
from changedetectionio import fetch_site_status from changedetectionio import fetch_site_status
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
while not self.app.config.exit.is_set():
try:
priority, uuid = self.q.get(block=False)
except queue.Empty:
pass
else:
self.current_uuid = uuid self.current_uuid = uuid
if uuid in list(self.datastore.data['watching'].keys()): if uuid in list(self.datastore.data['watching'].keys()):
@ -138,7 +126,7 @@ class update_worker(threading.Thread):
update_obj= {} update_obj= {}
xpath_data = False xpath_data = False
process_changedetection_results = True process_changedetection_results = True
print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url'])) print("> Processing UUID {} Priority {} URL {}".format(uuid, 1, self.datastore.data['watching'][uuid]['url']))
now = time.time() now = time.time()
try: try:
@ -149,7 +137,7 @@ class update_worker(threading.Thread):
if not isinstance(contents, (bytes, bytearray)): if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes") raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e: except PermissionError as e:
self.app.logger.error("File permission error updating", uuid, str(e)) logging.error("File permission error updating", uuid, str(e))
process_changedetection_results = False process_changedetection_results = False
except content_fetcher.ReplyWithContentButNoText as e: except content_fetcher.ReplyWithContentButNoText as e:
# Totally fine, it's by choice - just continue on, nothing more to care about # Totally fine, it's by choice - just continue on, nothing more to care about
@ -232,14 +220,14 @@ class update_worker(threading.Thread):
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code}) 'last_check_status': e.status_code})
except Exception as e: except Exception as e:
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
# Other serious error # Other serious error
process_changedetection_results = False process_changedetection_results = False
else: else:
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
if not self.datastore.data['watching'].get(uuid): if not self.datastore.data['watching'].get(uuid):
continue return
# Mark that we never had any failures # Mark that we never had any failures
if not self.datastore.data['watching'][uuid].get('ignore_status_codes'): if not self.datastore.data['watching'][uuid].get('ignore_status_codes'):
@ -273,7 +261,7 @@ class update_worker(threading.Thread):
except Exception as e: except Exception as e:
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart! # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e) print("!!!! Exception in update_worker !!!\n", e)
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
@ -288,10 +276,4 @@ class update_worker(threading.Thread):
self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data) self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data)
self.current_uuid = None # Done self.current_uuid = None
self.q.task_done()
# Give the CPU time to interrupt
time.sleep(0.1)
self.app.config.exit.wait(1)

@ -33,6 +33,8 @@ bs4
# XPath filtering, lxml is required by bs4 anyway, but put it here to be safe. # XPath filtering, lxml is required by bs4 anyway, but put it here to be safe.
lxml lxml
PriorityThreadPoolExecutor
# 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0 # 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0
selenium ~= 4.1.0 selenium ~= 4.1.0

Loading…
Cancel
Save