threading-memory-improvements
dgtlmoon 2 years ago
parent 1f27865fdf
commit 7823140442

@ -1213,6 +1213,7 @@ def changedetection_app(config=None, datastore_o=None):
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=ticker_thread_job_queue_processor).start()
threading.Thread(target=notification_runner).start() threading.Thread(target=notification_runner).start()
@ -1288,25 +1289,63 @@ def notification_runner():
# Trim the log length # Trim the log length
notification_debug_log = notification_debug_log[-100:] notification_debug_log = notification_debug_log[-100:]
# Check the queue, when a job exists, start a fresh thread of update_worker
def ticker_thread_job_queue_processor():
from changedetectionio import update_worker
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
time.sleep(0.3)
# Check that some threads are free
running = 0
for t in threading.enumerate():
if t.name == 'update_worker':
running += 1
if running >= n_workers:
continue
try:
uuid = update_q.get(block=False)
except queue.Empty:
# Go back to waiting for exit and/or another entry from the queue
continue
print ("Starting a thread fetch")
try:
# Launch the update_worker thread that will handle picking items off a queue and sending them off
# in the event that playwright or others have a memory leak, this should clean it up better than gc.collect()
# (By letting it exit entirely)
update_worker.update_worker(update_q, notification_q, app, datastore, uuid).start()
except Exception as e:
print ("Error launching update_worker for UUID {}.".format(uuid))
print (str(e))
print ("Running now {}", running)
# Thread runner to check every minute, look for new watches to feed into the Queue. # Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks(): def ticker_thread_check_time_launch_checks():
import random import random
from changedetectionio import update_worker
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
# Can go in its own function
# Always maintain the minimum number of threads, each thread will terminate when it has processed exactly 1 queued watch
# This is to be totally sure that they don't leak memory
# Spin up Workers that do the fetching # Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings # Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
while not app.config.exit.is_set(): while not app.config.exit.is_set():
# Get a list of watches by UUID that are currently fetching data # Update our list of watches by UUID that are currently fetching data, used in the UI
running_uuids = [] running_uuids = []
for t in running_update_threads: for t in running_update_threads:
if t.current_uuid: if t.current_uuid:

@ -7,19 +7,20 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
# A single update worker # A single update worker
# #
# Requests for checking on a single site(watch) from a queue of watches #
# (another process inserts watches into the queue that are time-ready for checking)
class update_worker(threading.Thread): class update_worker(threading.Thread):
current_uuid = None current_uuid = None
def __init__(self, q, notification_q, app, datastore, *args, **kwargs): def __init__(self, q, notification_q, app, datastore, uuid, *args, **kwargs):
self.q = q self.q = q
self.app = app self.app = app
self.notification_q = notification_q self.notification_q = notification_q
self.datastore = datastore self.datastore = datastore
self.current_uuid = uuid
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.name = "update_worker"
def send_filter_failure_notification(self, uuid): def send_filter_failure_notification(self, uuid):
@ -47,169 +48,170 @@ class update_worker(threading.Thread):
self.notification_q.put(n_object) self.notification_q.put(n_object)
print("Sent filter not found notification for {}".format(uuid)) print("Sent filter not found notification for {}".format(uuid))
# Pick one job off the list, process it threaded, exist
def run(self): def run(self):
# Go talk to the website
self.perform_site_update()
self.current_uuid = None # Done
self.q.task_done()
# Let the thread die after processing 1
# We will launch nice juicy fresh threads every time to prevent memory leaks in complex runner code (playwright etc)
print ("EXITING THREAD!")
self.app.config.exit.wait(1)
return
def perform_site_update(self):
from changedetectionio import fetch_site_status from changedetectionio import fetch_site_status
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) if not self.current_uuid in list(self.datastore.data['watching'].keys()):
return
while not self.app.config.exit.is_set(): changed_detected = False
contents = ""
screenshot = False
update_obj= {}
xpath_data = False
now = time.time()
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
try:
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(self.current_uuid)
# Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc
if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e:
self.app.logger.error("File permission error updating", self.current_uuid, str(e))
except content_fetcher.ReplyWithContentButNoText as e:
# Totally fine, it's by choice - just continue on, nothing more to care about
# Page had elements/content but no renderable text
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': "Got HTML content but no text found."})
except FilterNotFoundInResponse as e:
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
c = 0
if self.datastore.data['watching'].get(self.current_uuid, False):
c = self.datastore.data['watching'][self.current_uuid].get('consecutive_filter_failures', 5)
c += 1
# Send notification if we reached the threshold?
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
print("Filter for {} not found, consecutive_filter_failures: {}".format(self.current_uuid, c))
if threshold >0 and c >= threshold:
self.send_filter_failure_notification(self.current_uuid)
c = 0
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'consecutive_filter_failures': c})
except content_fetcher.EmptyReply as e:
# Some kind of custom to-str handler in the exception handler that does this?
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except content_fetcher.ScreenshotUnavailable as e:
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except content_fetcher.PageUnloadable as e:
err_text = "Page request from server didnt respond correctly"
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except Exception as e:
self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
else:
try: try:
uuid = self.q.get(block=False) watch = self.datastore.data['watching'][self.current_uuid]
except queue.Empty: fname = "" # Saved history text filename
pass
# For the FIRST time we check a site, or a change detected, save the snapshot.
else: if changed_detected or not watch['last_checked']:
self.current_uuid = uuid # A change was detected
fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
if uuid in list(self.datastore.data['watching'].keys()):
# Generally update anything interesting returned
changed_detected = False update_obj['consecutive_filter_failures'] = 0
contents = "" self.datastore.update_watch(uuid=self.current_uuid, update_obj=update_obj)
screenshot = False
update_obj= {} # A change was detected
xpath_data = False if changed_detected:
now = time.time() n_object = {}
print (">> Change detected in UUID {} - {}".format(self.current_uuid, watch['url']))
try:
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(uuid) # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
# Re #342 if watch.history_n >= 2:
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. # Atleast 2, means there really was a change
# We then convert/.decode('utf-8') for the notification etc self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_changed': round(now)})
if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes") watch_history = watch.history
except PermissionError as e: dates = list(watch_history.keys())
self.app.logger.error("File permission error updating", uuid, str(e)) # Theoretically it's possible that this could be just 1 long,
except content_fetcher.ReplyWithContentButNoText as e: # - In the case that the timestamp key was not unique
# Totally fine, it's by choice - just continue on, nothing more to care about if len(dates) == 1:
# Page had elements/content but no renderable text raise ValueError(
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found."}) "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
except FilterNotFoundInResponse as e: )
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e)) prev_fname = watch_history[dates[-2]]
c = 0
if self.datastore.data['watching'].get(uuid, False): # Did it have any notification alerts to hit?
c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) if len(watch['notification_urls']):
c += 1 print(">>> Notifications queued for UUID from watch {}".format(self.current_uuid))
n_object['notification_urls'] = watch['notification_urls']
# Send notification if we reached the threshold? n_object['notification_title'] = watch['notification_title']
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) n_object['notification_body'] = watch['notification_body']
print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) n_object['notification_format'] = watch['notification_format']
if threshold >0 and c >= threshold:
self.send_filter_failure_notification(uuid) # No? maybe theres a global setting, queue them all
c = 0 elif len(self.datastore.data['settings']['application']['notification_urls']):
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(self.current_uuid))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
'consecutive_filter_failures': c}) n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
except content_fetcher.EmptyReply as e: n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
# Some kind of custom to-str handler in the exception handler that does this? n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) else:
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, print(">>> NO notifications queued, watch and global notification URLs were empty.")
'last_check_status': e.status_code})
except content_fetcher.ScreenshotUnavailable as e: # Only prepare to notify if the rules above matched
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" if 'notification_urls' in n_object:
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, # HTML needs linebreak, but MarkDown and Text can use a linefeed
'last_check_status': e.status_code}) if n_object['notification_format'] == 'HTML':
except content_fetcher.PageUnloadable as e: line_feed_sep = "</br>"
err_text = "Page request from server didnt respond correctly" else:
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, line_feed_sep = "\n"
'last_check_status': e.status_code})
except Exception as e: from changedetectionio import diff
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) n_object.update({
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) 'watch_url': watch['url'],
'uuid': self.current_uuid,
else: 'current_snapshot': contents.decode('utf-8'),
try: 'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
watch = self.datastore.data['watching'][uuid] 'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
fname = "" # Saved history text filename })
# For the FIRST time we check a site, or a change detected, save the snapshot. self.notification_q.put(n_object)
if changed_detected or not watch['last_checked']:
# A change was detected except Exception as e:
fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e)
# Generally update anything interesting returned self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
update_obj['consecutive_filter_failures'] = 0 self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
finally:
# A change was detected # Always record that we atleast tried
if changed_detected: self.datastore.update_watch(uuid=self.current_uuid, update_obj={'fetch_time': round(time.time() - now, 3),
n_object = {} 'last_checked': round(time.time())})
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
# Always save the screenshot if it's available
# Notifications should only trigger on the second time (first time, we gather the initial snapshot) if screenshot:
if watch.history_n >= 2: self.datastore.save_screenshot(watch_uuid=self.current_uuid, screenshot=screenshot)
# Atleast 2, means there really was a change if xpath_data:
self.datastore.update_watch(uuid=uuid, update_obj={'last_changed': round(now)}) self.datastore.save_xpath_data(watch_uuid=self.current_uuid, data=xpath_data)
watch_history = watch.history
dates = list(watch_history.keys())
# Theoretically it's possible that this could be just 1 long,
# - In the case that the timestamp key was not unique
if len(dates) == 1:
raise ValueError(
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
)
prev_fname = watch_history[dates[-2]]
# Did it have any notification alerts to hit?
if len(watch['notification_urls']):
print(">>> Notifications queued for UUID from watch {}".format(uuid))
n_object['notification_urls'] = watch['notification_urls']
n_object['notification_title'] = watch['notification_title']
n_object['notification_body'] = watch['notification_body']
n_object['notification_format'] = watch['notification_format']
# No? maybe theres a global setting, queue them all
elif len(self.datastore.data['settings']['application']['notification_urls']):
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid))
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
else:
print(">>> NO notifications queued, watch and global notification URLs were empty.")
# Only prepare to notify if the rules above matched
if 'notification_urls' in n_object:
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object['notification_format'] == 'HTML':
line_feed_sep = "</br>"
else:
line_feed_sep = "\n"
from changedetectionio import diff
n_object.update({
'watch_url': watch['url'],
'uuid': uuid,
'current_snapshot': contents.decode('utf-8'),
'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
})
self.notification_q.put(n_object)
except Exception as e:
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e)
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
finally:
# Always record that we atleast tried
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
'last_checked': round(time.time())})
# Always save the screenshot if it's available
if screenshot:
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=screenshot)
if xpath_data:
self.datastore.save_xpath_data(watch_uuid=uuid, data=xpath_data)
self.current_uuid = None # Done
self.q.task_done()
# Give the CPU time to interrupt
time.sleep(0.1)
self.app.config.exit.wait(1)

Loading…
Cancel
Save