@ -1,11 +1,12 @@
from . import content_fetchers
from . processors . restock_diff import UnableToExtractRestockData
from . processors . text_json_diff import FilterNotFoundInResponse
from changedetectionio import html_tools
from copy import deepcopy
import os
import threading
import queue
import threading
import time
from . import content_fetchers
from changedetectionio import html_tools
from . processors . text_json_diff import FilterNotFoundInResponse
from . processors . restock_diff import UnableToExtractRestockData
# A single update worker
#
@ -245,14 +246,19 @@ class update_worker(threading.Thread):
contents = b ' '
process_changedetection_results = True
update_obj = { }
logger . info ( f " Processing watch UUID { uuid } "
f " Priority { queued_item_data . priority } "
f " URL { self . datastore . data [ ' watching ' ] [ uuid ] [ ' url ' ] } " )
# Clear last errors (move to preflight func?)
self . datastore . data [ ' watching ' ] [ uuid ] [ ' browser_steps_last_error_step ' ] = None
# DeepCopy so we can be sure we don't accidently change anything by reference
watch = deepcopy ( self . datastore . data [ ' watching ' ] . get ( uuid ) )
logger . info ( f " Processing watch UUID { uuid } Priority { queued_item_data . priority } URL { watch [ ' url ' ] } " )
now = time . time ( )
try :
# Processor is what we are using for detecting the "Change"
processor = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' processor ' , ' text_json_diff ' )
processor = watch. get ( ' processor ' , ' text_json_diff ' )
# if system...
# Abort processing when the content was the same as the last fetch
@ -272,12 +278,10 @@ class update_worker(threading.Thread):
watch_uuid = uuid
)
# Clear last errors (move to preflight func?)
self . datastore . data [ ' watching ' ] [ uuid ] [ ' browser_steps_last_error_step ' ] = None
update_handler . call_browser ( )
changed_detected , update_obj , contents = update_handler . run_changedetection ( uuid ,
changed_detected , update_obj , contents = update_handler . run_changedetection (
watch = watch ,
skip_when_checksum_same = skip_when_same_checksum ,
)
@ -309,7 +313,8 @@ class update_worker(threading.Thread):
} )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
process_changedetection_results = False
except content_fetchers . exceptions . Non200ErrorCodeReceived as e :
@ -325,11 +330,11 @@ class update_worker(threading.Thread):
err_text = " Error - Request returned a HTTP error code {} " . format ( str ( e . status_code ) )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
if e . xpath_data :
self . datastore . save_xpath_data ( watch_uuid = uuid , data = e . xpath_data , as_error = True )
watch . save_xpath_data ( data = e . xpath_data , as_error = True )
if e . page_text :
self . datastore . save_error_text ( watch_uuid = uuid , contents = e . page_text )
watch . save_error_text ( contents = e . page_text )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
process_changedetection_results = False
@ -340,17 +345,19 @@ class update_worker(threading.Thread):
err_text = " Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary. "
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
if e . screenshot :
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
# Only when enabled, send the notification
if self . datastore. data [ ' watching' ] [ uuid ] . get ( ' filter_failure_notification_send ' , False ) :
c = self . datastore. data [ ' watching' ] [ uuid ] . get ( ' consecutive_filter_failures ' , 5 )
if watch. get ( ' filter_failure_notification_send ' , False ) :
c = watch. get ( ' consecutive_filter_failures ' , 5 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
0 )
logger . warning ( f " Filter for { uuid } not found, consecutive_filter_failures: { c } " )
if threshold > 0 and c > = threshold :
if not self . datastore. data [ ' watching' ] [ uuid ] . get ( ' notification_muted ' ) :
if not watch. get ( ' notification_muted ' ) :
self . send_filter_failure_notification ( uuid )
c = 0
@ -400,15 +407,15 @@ class update_worker(threading.Thread):
}
)
if self . datastore. data [ ' watching' ] [ uuid ] . get ( ' filter_failure_notification_send ' , False ) :
c = self . datastore. data [ ' watching' ] [ uuid ] . get ( ' consecutive_filter_failures ' , 5 )
if watch. get ( ' filter_failure_notification_send ' , False ) :
c = watch. get ( ' consecutive_filter_failures ' , 5 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
0 )
logger . error ( f " Step for { uuid } not found, consecutive_filter_failures: { c } " )
if threshold > 0 and c > = threshold :
if not self . datastore. data [ ' watching' ] [ uuid ] . get ( ' notification_muted ' ) :
if not watch. get ( ' notification_muted ' ) :
self . send_step_failure_notification ( watch_uuid = uuid , step_n = e . step_n )
c = 0
@ -430,7 +437,7 @@ class update_worker(threading.Thread):
except content_fetchers . exceptions . JSActionExceptions as e :
err_text = " Error running JS Actions - Page request - " + e . message
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
process_changedetection_results = False
@ -440,7 +447,7 @@ class update_worker(threading.Thread):
err_text = " {} - {} " . format ( err_text , e . message )
if e . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = e . screenshot , as_error = True )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code ,
@ -464,8 +471,6 @@ class update_worker(threading.Thread):
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : " Exception: " + str ( e ) } )
# Other serious error
process_changedetection_results = False
# import traceback
# print(traceback.format_exc())
else :
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
@ -473,7 +478,7 @@ class update_worker(threading.Thread):
continue
# Mark that we never had any failures
if not self . datastore. data [ ' watching' ] [ uuid ] . get ( ' ignore_status_codes ' ) :
if not watch. get ( ' ignore_status_codes ' ) :
update_obj [ ' consecutive_filter_failures ' ] = 0
# Everything ran OK, clean off any previous error
@ -481,25 +486,48 @@ class update_worker(threading.Thread):
self . cleanup_error_artifacts ( uuid )
if not self . datastore . data [ ' watching ' ] . get ( uuid ) :
continue
#
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
if process_changedetection_results :
# Always save the screenshot if it's available
if update_handler . screenshot :
watch . save_screenshot ( screenshot = update_handler . screenshot )
if update_handler . xpath_data :
watch . save_xpath_data ( data = update_handler . xpath_data )
try :
watch = self . datastore . data [ ' watching ' ] . get ( uuid )
self . datastore . update_watch ( uuid = uuid , update_obj = update_obj )
# Also save the snapshot on the first time checked
if changed_detected or not watch [ ' last_checked ' ] :
if changed_detected or not watch . get ( ' last_checked ' ) :
timestamp = round ( time . time ( ) )
# Small hack so that we sleep just enough to allow 1 second between history snapshots
# this is because history.txt indexes/keys snapshots by epoch seconds and we dont want dupe keys
if watch . newest_history_key and int ( timestamp ) == int ( watch . newest_history_key ) :
logger . warning (
f " Timestamp { timestamp } already exists, waiting 1 seconds so we have a unique key in history.txt " )
timestamp = str ( int ( timestamp ) + 1 )
time . sleep ( 1 )
watch . save_history_text ( contents = contents ,
timestamp = str ( round ( time . time ( ) ) ) ,
timestamp = timestamp ,
snapshot_id = update_obj . get ( ' previous_md5 ' , ' none ' ) )
if update_handler . fetcher . content :
watch . save_last_fetched_html ( contents = update_handler . fetcher . content , timestamp = timestamp )
# A change was detected
if changed_detected :
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
if watch . history_n > = 2 :
logger . info ( f " Change detected in UUID { uuid } - { watch [ ' url ' ] } " )
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
if not watch. get ( ' notification_muted ' ) :
self . send_content_changed_notification ( watch_uuid = uuid )
else :
logger . info ( f " Change triggered in UUID { uuid } due to first history saving (no notifications sent) - { watch [ ' url ' ] } " )
@ -510,9 +538,9 @@ class update_worker(threading.Thread):
logger . critical ( str ( e ) )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
if self . datastore . data [ ' watching ' ] . get ( uuid ) :
# Always record that we atleast tried
count = self . datastore. data [ ' watching' ] [ uuid ] . get ( ' check_count ' , 0 ) + 1
count = watch. get ( ' check_count ' , 0 ) + 1
# Record the 'server' header reply, can be used for actions in the future like cloudflare/akamai workarounds
try :
@ -528,12 +556,6 @@ class update_worker(threading.Thread):
' check_count ' : count
} )
# Always save the screenshot if it's available
if update_handler . screenshot :
self . datastore . save_screenshot ( watch_uuid = uuid , screenshot = update_handler . screenshot )
if update_handler . xpath_data :
self . datastore . save_xpath_data ( watch_uuid = uuid , data = update_handler . xpath_data )
self . current_uuid = None # Done
self . q . task_done ( )