Maintain the pool of workers in its own thread for more reliability and flexibility

adjustable-workers
dgtlmoon 11 months ago
parent ccb42bcb12
commit 8aa675cbf0

@ -1547,6 +1547,7 @@ def changedetection_app(config=None, datastore_o=None):
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()
threading.Thread(target=thread_maintain_worker_thread_pool).start()
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
@ -1629,23 +1630,73 @@ def notification_runner():
# Trim the log length
notification_debug_log = notification_debug_log[-100:]
def thread_maintain_worker_thread_pool():
from changedetectionio import update_worker
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
needed_threads = n_workers if not running_update_threads else 0
how_many_running_now = 0
dead_threads = []
for i, t in enumerate(running_update_threads):
if t.is_alive():
how_many_running_now += 1
else:
dead_threads.append(i)
for i in dead_threads:
del running_update_threads[i]
for _ in range(needed_threads - how_many_running_now):
logger.info("Adding new worker thread")
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
app.config.exit.wait(2)
def thread_maintain_worker_thread_pool():
from changedetectionio import update_worker
logger.info("Starting thread pool worker maintainer thread")
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
needed_threads = n_workers if not running_update_threads else 0
how_many_running_now = 0
dead_threads = []
for i, t in enumerate(running_update_threads):
if t.is_alive():
how_many_running_now += 1
else:
dead_threads.append(i)
for i in dead_threads:
del running_update_threads[i]
for _ in range(needed_threads - how_many_running_now):
logger.info("Adding new worker thread")
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
app.config.exit.wait(2)
# Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks():
import random
from changedetectionio import update_worker
proxy_last_called_time = {}
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
# Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
while not app.config.exit.is_set():
@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks():
priority = int(time.time())
logger.debug(
f"> Queued watch UUID {uuid} "
f"last checked at {watch['last_checked']} "
f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} "
f"queued at {now:0.2f} priority {priority} "
f"jitter {watch.jitter_seconds:0.2f}s, "
f"{now - watch['last_checked']:0.2f}s since last checked")

Loading…
Cancel
Save