@ -19,11 +19,9 @@ from loguru import logger
class update_worker ( threading . Thread ) :
current_uuid = None
def __init__ ( self , q, notification_q , app, datastore , * args , * * kwargs ) :
self . q = q
def __init__ ( self , app, * args , * * kwargs ) :
self . app = app
self . notification_q = notification_q
self . datastore = datastore
super ( ) . __init__ ( * args , * * kwargs )
def queue_notification_for_watch ( self , notification_q , n_object , watch ) :
@ -102,19 +100,19 @@ class update_worker(threading.Thread):
v = watch . get ( var_name )
if v and not watch . get ( ' notification_muted ' ) :
if var_name == ' notification_format ' and v == default_notification_format_for_watch :
return self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' notification_format ' )
return self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' notification_format ' )
return v
tags = self . datastore. get_all_tags_for_watch ( uuid = watch . get ( ' uuid ' ) )
tags = self . app. datastore. get_all_tags_for_watch ( uuid = watch . get ( ' uuid ' ) )
if tags :
for tag_uuid , tag in tags . items ( ) :
v = tag . get ( var_name )
if v and not tag . get ( ' notification_muted ' ) :
return v
if self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( var_name ) :
return self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( var_name )
if self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( var_name ) :
return self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( var_name )
# Otherwise could be defaults
if var_name == ' notification_format ' :
@ -129,7 +127,7 @@ class update_worker(threading.Thread):
def send_content_changed_notification ( self , watch_uuid ) :
n_object = { }
watch = self . datastore. data [ ' watching ' ] . get ( watch_uuid )
watch = self . app. datastore. data [ ' watching ' ] . get ( watch_uuid )
if not watch :
return
@ -156,17 +154,17 @@ class update_worker(threading.Thread):
queued = True
count = watch . get ( ' notification_alert_count ' , 0 ) + 1
self . datastore. update_watch ( uuid = watch_uuid , update_obj = { ' notification_alert_count ' : count } )
self . app. datastore. update_watch ( uuid = watch_uuid , update_obj = { ' notification_alert_count ' : count } )
self . queue_notification_for_watch ( notification_q = self . notification_q, n_object = n_object , watch = watch )
self . queue_notification_for_watch ( notification_q = self . app. notification_q, n_object = n_object , watch = watch )
return queued
def send_filter_failure_notification ( self , watch_uuid ) :
threshold = self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
watch = self . datastore. data [ ' watching ' ] . get ( watch_uuid )
threshold = self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
watch = self . app. datastore. data [ ' watching ' ] . get ( watch_uuid )
if not watch :
return
@ -179,8 +177,8 @@ class update_worker(threading.Thread):
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
elif len ( self . app. datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . app. datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
@ -189,16 +187,16 @@ class update_worker(threading.Thread):
' uuid ' : watch_uuid ,
' screenshot ' : None
} )
self . notification_q. put ( n_object )
self . app. notification_q. put ( n_object )
logger . debug ( f " Sent filter not found notification for { watch_uuid } " )
else :
logger . debug ( f " NOT sending filter not found notification for { watch_uuid } - no notification URLs " )
def send_step_failure_notification ( self , watch_uuid , step_n ) :
watch = self . datastore. data [ ' watching ' ] . get ( watch_uuid , False )
watch = self . app. datastore. data [ ' watching ' ] . get ( watch_uuid , False )
if not watch :
return
threshold = self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
threshold = self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' )
n_object = { ' notification_title ' : " Changedetection.io - Alert - Browser step at position {} could not be run " . format ( step_n + 1 ) ,
' notification_body ' : " Your configured browser step at position {} for {{ {{ watch_url}}}} "
" did not appear on the page after {} attempts, did the page change layout? "
@ -209,8 +207,8 @@ class update_worker(threading.Thread):
if len ( watch [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = watch [ ' notification_urls ' ]
elif len ( self . datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
elif len ( self . app. datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ] ) :
n_object [ ' notification_urls ' ] = self . app. datastore. data [ ' settings ' ] [ ' application ' ] [ ' notification_urls ' ]
# Only prepare to notify if the rules above matched
if ' notification_urls ' in n_object :
@ -218,7 +216,7 @@ class update_worker(threading.Thread):
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid
} )
self . notification_q. put ( n_object )
self . app. notification_q. put ( n_object )
logger . error ( f " Sent step not found notification for { watch_uuid } " )
@ -226,7 +224,7 @@ class update_worker(threading.Thread):
# All went fine, remove error artifacts
cleanup_files = [ " last-error-screenshot.png " , " last-error.txt " ]
for f in cleanup_files :
full_path = os . path . join ( self . datastore. datastore_path , uuid , f )
full_path = os . path . join ( self . app. datastore. datastore_path , uuid , f )
if os . path . isfile ( full_path ) :
os . unlink ( full_path )
@ -237,23 +235,23 @@ class update_worker(threading.Thread):
update_handler = None
try :
queued_item_data = self . q. get ( block = False )
queued_item_data = self . app. update_ q. get ( block = False )
except queue . Empty :
pass
else :
uuid = queued_item_data . item . get ( ' uuid ' )
self . current_uuid = uuid
if uuid in list ( self . datastore. data [ ' watching ' ] . keys ( ) ) and self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' url ' ) :
if uuid in list ( self . app. datastore. data [ ' watching ' ] . keys ( ) ) and self . app . datastore . data [ ' watching ' ] [ uuid ] . get ( ' url ' ) :
changed_detected = False
contents = b ' '
process_changedetection_results = True
update_obj = { }
# Clear last errors (move to preflight func?)
self . datastore. data [ ' watching ' ] [ uuid ] [ ' browser_steps_last_error_step ' ] = None
self . app. datastore. data [ ' watching ' ] [ uuid ] [ ' browser_steps_last_error_step ' ] = None
watch = self . datastore. data [ ' watching ' ] . get ( uuid )
watch = self . app. datastore. data [ ' watching ' ] . get ( uuid )
logger . info ( f " Processing watch UUID { uuid } Priority { queued_item_data . priority } URL { watch [ ' url ' ] } " )
now = time . time ( )
@ -270,7 +268,7 @@ class update_worker(threading.Thread):
print ( f " Processor module ' { processor } ' not found. " )
raise e
update_handler = processor_module . perform_site_check ( datastore = self . datastore,
update_handler = processor_module . perform_site_check ( datastore = self . app. datastore,
watch_uuid = uuid
)
@ -294,7 +292,7 @@ class update_worker(threading.Thread):
watch . save_screenshot ( screenshot = e . screenshot )
if e . xpath_data :
watch . save_xpath_data ( data = e . xpath_data )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : e . message } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : e . message } )
process_changedetection_results = False
except content_fetchers_exceptions . ReplyWithContentButNoText as e :
@ -311,7 +309,7 @@ class update_worker(threading.Thread):
else :
extra_help = " , it ' s possible that the filters were found, but contained no usable text. "
self . datastore. update_watch ( uuid = uuid , update_obj = {
self . app. datastore. update_watch ( uuid = uuid , update_obj = {
' last_error ' : f " Got HTML content but no text found (With { e . status_code } reply code) { extra_help } "
} )
@ -343,15 +341,15 @@ class update_worker(threading.Thread):
if e . page_text :
watch . save_error_text ( contents = e . page_text )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
process_changedetection_results = False
except FilterNotFoundInResponse as e :
if not self . datastore. data [ ' watching ' ] . get ( uuid ) :
if not self . app. datastore. data [ ' watching ' ] . get ( uuid ) :
continue
err_text = " Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary. "
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
# Filter wasnt found, but we should still update the visual selector so that they can have a chance to set it up again
if e . screenshot :
@ -365,7 +363,7 @@ class update_worker(threading.Thread):
c = watch . get ( ' consecutive_filter_failures ' , 0 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' , 0 )
threshold = self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' , 0 )
logger . debug ( f " Filter for { uuid } not found, consecutive_filter_failures: { c } of threshold { threshold } " )
if c > = threshold :
if not watch . get ( ' notification_muted ' ) :
@ -374,7 +372,7 @@ class update_worker(threading.Thread):
c = 0
logger . debug ( f " Reset filter failure count back to zero " )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
else :
logger . trace ( f " { uuid } - filter_failure_notification_send not enabled, skipping " )
@ -386,20 +384,20 @@ class update_worker(threading.Thread):
process_changedetection_results = False
changed_detected = False
except content_fetchers_exceptions . BrowserConnectError as e :
self . datastore. update_watch ( uuid = uuid ,
self . app. datastore. update_watch ( uuid = uuid ,
update_obj = { ' last_error ' : e . msg
}
)
process_changedetection_results = False
except content_fetchers_exceptions . BrowserFetchTimedOut as e :
self . datastore. update_watch ( uuid = uuid ,
self . app. datastore. update_watch ( uuid = uuid ,
update_obj = { ' last_error ' : e . msg
}
)
process_changedetection_results = False
except content_fetchers_exceptions . BrowserStepsStepException as e :
if not self . datastore. data [ ' watching ' ] . get ( uuid ) :
if not self . app. datastore. data [ ' watching ' ] . get ( uuid ) :
continue
error_step = e . step_n + 1
@ -417,7 +415,7 @@ class update_worker(threading.Thread):
logger . debug ( f " BrowserSteps exception at step { error_step } { str ( e . original_e ) } " )
self . datastore. update_watch ( uuid = uuid ,
self . app. datastore. update_watch ( uuid = uuid ,
update_obj = { ' last_error ' : err_text ,
' browser_steps_last_error_step ' : error_step
}
@ -427,7 +425,7 @@ class update_worker(threading.Thread):
c = watch . get ( ' consecutive_filter_failures ' , 0 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
threshold = self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' filter_failure_notification_threshold_attempts ' ,
0 )
logger . error ( f " Step for { uuid } not found, consecutive_filter_failures: { c } " )
if threshold > 0 and c > = threshold :
@ -435,26 +433,26 @@ class update_worker(threading.Thread):
self . send_step_failure_notification ( watch_uuid = uuid , step_n = e . step_n )
c = 0
self . datastore. update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
process_changedetection_results = False
except content_fetchers_exceptions . EmptyReply as e :
# Some kind of custom to-str handler in the exception handler that does this?
err_text = " EmptyReply - try increasing ' Wait seconds before extracting text ' , Status Code {} " . format ( e . status_code )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
process_changedetection_results = False
except content_fetchers_exceptions . ScreenshotUnavailable as e :
err_text = " Screenshot unavailable, page did not render fully in the expected time or page was too long - try increasing ' Wait seconds before extracting text ' "
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
process_changedetection_results = False
except content_fetchers_exceptions . JSActionExceptions as e :
err_text = " Error running JS Actions - Page request - " + e . message
if e . screenshot :
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
process_changedetection_results = False
except content_fetchers_exceptions . PageUnloadable as e :
@ -465,26 +463,26 @@ class update_worker(threading.Thread):
if e . screenshot :
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code ,
' has_ldjson_price_data ' : None } )
process_changedetection_results = False
except content_fetchers_exceptions . BrowserStepsInUnsupportedFetcher as e :
err_text = " This watch has Browser Steps configured and so it cannot run with the ' Basic fast Plaintext/HTTP Client ' , either remove the Browser Steps or select a Chrome fetcher. "
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
process_changedetection_results = False
logger . error ( f " Exception (BrowserStepsInUnsupportedFetcher) reached processing watch UUID: { uuid } " )
except Exception as e :
logger . error ( f " Exception reached processing watch UUID: { uuid } " )
logger . error ( str ( e ) )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : " Exception: " + str ( e ) } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : " Exception: " + str ( e ) } )
# Other serious error
process_changedetection_results = False
else :
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
if not self . datastore. data [ ' watching ' ] . get ( uuid ) :
if not self . app. datastore. data [ ' watching ' ] . get ( uuid ) :
continue
update_obj [ ' content-type ' ] = update_handler . fetcher . get_all_headers ( ) . get ( ' content-type ' , ' ' ) . lower ( )
@ -498,14 +496,14 @@ class update_worker(threading.Thread):
self . cleanup_error_artifacts ( uuid )
if not self . datastore. data [ ' watching ' ] . get ( uuid ) :
if not self . app. datastore. data [ ' watching ' ] . get ( uuid ) :
continue
#
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
if process_changedetection_results :
# Extract <title> as title if possible/requested.
if self . datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' extract_title_as_title ' ) or watch [ ' extract_title_as_title ' ] :
if self . app. datastore. data [ ' settings ' ] [ ' application ' ] . get ( ' extract_title_as_title ' ) or watch [ ' extract_title_as_title ' ] :
if not watch [ ' title ' ] or not len ( watch [ ' title ' ] ) :
try :
update_obj [ ' title ' ] = html_tools . extract_element ( find = ' title ' , html_content = update_handler . fetcher . content )
@ -516,7 +514,7 @@ class update_worker(threading.Thread):
# Now update after running everything
timestamp = round ( time . time ( ) )
try :
self . datastore. update_watch ( uuid = uuid , update_obj = update_obj )
self . app. datastore. update_watch ( uuid = uuid , update_obj = update_obj )
# Also save the snapshot on the first time checked, "last checked" will always be updated, so we just check history length.
@ -554,7 +552,7 @@ class update_worker(threading.Thread):
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
logger . critical ( " !!!! Exception in update_worker while processing process_changedetection_results !!! " )
logger . critical ( str ( e ) )
self . datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
# Always record that we atleast tried
@ -563,13 +561,13 @@ class update_worker(threading.Thread):
# Record the 'server' header reply, can be used for actions in the future like cloudflare/akamai workarounds
try :
server_header = update_handler . fetcher . headers . get ( ' server ' , ' ' ) . strip ( ) . lower ( ) [ : 255 ]
self . datastore. update_watch ( uuid = uuid ,
self . app. datastore. update_watch ( uuid = uuid ,
update_obj = { ' remote_server_reply ' : server_header }
)
except Exception as e :
pass
self . datastore. update_watch ( uuid = uuid , update_obj = { ' fetch_time ' : round ( time . time ( ) - now , 3 ) ,
self . app. datastore. update_watch ( uuid = uuid , update_obj = { ' fetch_time ' : round ( time . time ( ) - now , 3 ) ,
' last_checked ' : round ( time . time ( ) ) ,
' check_count ' : count
} )