@ -9,6 +9,11 @@ import queue
import threading
import threading
import time
import time
from changedetectionio import content_fetcher , html_tools
from . processors . restock_diff import UnableToExtractRestockData
from . processors . text_json_diff import FilterNotFoundInResponse
# A single update worker
# A single update worker
#
#
# Requests for checking on a single site(watch) from a queue of watches
# Requests for checking on a single site(watch) from a queue of watches
@ -69,11 +74,16 @@ class update_worker(threading.Thread):
n_object . update ( {
n_object . update ( {
' current_snapshot ' : snapshot_contents ,
' current_snapshot ' : snapshot_contents ,
' diff ' : diff . render_diff ( prev_snapshot , current_snapshot , line_feed_sep = line_feed_sep ) ,
' diff ' : diff . render_diff ( prev_snapshot , current_snapshot ,
' diff_added ' : diff . render_diff ( prev_snapshot , current_snapshot , include_removed = False , line_feed_sep = line_feed_sep ) ,
line_feed_sep = line_feed_sep ) ,
' diff_full ' : diff . render_diff ( prev_snapshot , current_snapshot , include_equal = True , line_feed_sep = line_feed_sep ) ,
' diff_added ' : diff . render_diff ( prev_snapshot , current_snapshot ,
' diff_patch ' : diff . render_diff ( prev_snapshot , current_snapshot , line_feed_sep = line_feed_sep , patch_format = True ) ,
include_removed = False , line_feed_sep = line_feed_sep ) ,
' diff_removed ' : diff . render_diff ( prev_snapshot , current_snapshot , include_added = False , line_feed_sep = line_feed_sep ) ,
' diff_full ' : diff . render_diff ( prev_snapshot , current_snapshot ,
include_equal = True , line_feed_sep = line_feed_sep ) ,
' diff_patch ' : diff . render_diff ( prev_snapshot , current_snapshot ,
line_feed_sep = line_feed_sep , patch_format = True ) ,
' diff_removed ' : diff . render_diff ( prev_snapshot , current_snapshot , include_added = False ,
line_feed_sep = line_feed_sep ) ,
' notification_timestamp ' : now ,
' notification_timestamp ' : now ,
' screenshot ' : watch . get_screenshot ( ) if watch and watch . get ( ' notification_screenshot ' ) else None ,
' screenshot ' : watch . get_screenshot ( ) if watch and watch . get ( ' notification_screenshot ' ) else None ,
' triggered_text ' : triggered_text ,
' triggered_text ' : triggered_text ,
@ -218,6 +228,26 @@ class update_worker(threading.Thread):
self . notification_q . put ( n_object )
self . notification_q . put ( n_object )
logger . error ( f " Sent step not found notification for { watch_uuid } " )
logger . error ( f " Sent step not found notification for { watch_uuid } " )
def send_failure_notification ( self , watch_uuid : str , error_text : str ) :
watch = self . datastore . data [ ' watching ' ] . get ( watch_uuid )
if not watch :
return
n_object = { ' notification_title ' : ' Changedetection.io - Alert - {} ' . format ( error_text ) ,
' notification_body ' : " Your watch {{ watch_url}} failed! \n \n Link: {{ base_url}}/edit/ {{ watch_uuid}} \n \n Thanks - Your omniscient changedetection.io installation :) \n " ,
' notification_format ' : ' text ' }
n_object [ ' notification_urls ' ] = self . _check_cascading_vars ( ' notification_urls ' , watch )
# Only prepare to notify if the rules above matched
if n_object and n_object . get ( ' notification_urls ' ) :
n_object . update ( {
' watch_url ' : watch [ ' url ' ] ,
' uuid ' : watch_uuid ,
' screenshot ' : None
} )
self . notification_q . put ( n_object )
print ( " Sent error notification for {} " . format ( watch_uuid ) )
def cleanup_error_artifacts ( self , uuid ) :
def cleanup_error_artifacts ( self , uuid ) :
# All went fine, remove error artifacts
# All went fine, remove error artifacts
@ -227,9 +257,47 @@ class update_worker(threading.Thread):
if os . path . isfile ( full_path ) :
if os . path . isfile ( full_path ) :
os . unlink ( full_path )
os . unlink ( full_path )
def _update_watch ( self , uuid , update_obj , exception ) :
# TODO check if update succeeded or had an error.
# If it had an error, handle notifications
# If it did not have one, clean up any error states
# TODO Future - loop over notification handlers and send them the update_obj, allowing modification
last_error = update_obj . get ( ' last_error ' , False )
if last_error :
# TODO Future - message notification handlers
if self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_notify_on_failure ' , False ) :
self . send_failure_notification ( watch_uuid = uuid , error_text = update_obj [ ' last_error ' ] )
pass
else :
# TODO Future - message notification handlers
pass
self . datastore . update_watch ( uuid = uuid , update_obj = update_obj )
if isinstance ( exception , FilterNotFoundInResponse ) or isinstance ( exception , content_fetcher . BrowserStepsStepTimout ) :
# Only when enabled, send the notification
if self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' filter_failure_notification_send ' , False ) :
c = self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' consecutive_filter_failures ' , 5 )
c + = 1
# Send notification if we reached the threshold?
threshold = self . datastore . data [ ' settings ' ] [ ' application ' ] . get (
' filter_failure_notification_threshold_attempts ' ,
0 )
print ( " Filter for {} not found, consecutive_filter_failures: {} " . format ( uuid , c ) )
if threshold > 0 and c > = threshold :
if not self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' notification_muted ' ) :
if isinstance ( exception , FilterNotFoundInResponse ) :
self . send_filter_failure_notification ( uuid )
else :
self . send_step_failure_notification ( watch_uuid = uuid , step_n = exception . step_n )
c = 0
self . datastore . update_watch ( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
def run ( self ) :
def run ( self ) :
now = time . time ( )
now = time . time ( )
while not self . app . config . exit . is_set ( ) :
while not self . app . config . exit . is_set ( ) :
update_handler = None
update_handler = None
@ -241,7 +309,8 @@ class update_worker(threading.Thread):
else :
else :
uuid = queued_item_data . item . get ( ' uuid ' )
uuid = queued_item_data . item . get ( ' uuid ' )
self . current_uuid = uuid
self . current_uuid = uuid
if uuid in list ( self . datastore . data [ ' watching ' ] . keys ( ) ) and self . datastore . data [ ' watching ' ] [ uuid ] . get ( ' url ' ) :
if uuid in list ( self . datastore . data [ ' watching ' ] . keys ( ) ) and self . datastore . data [ ' watching ' ] [ uuid ] . get (
' url ' ) :
changed_detected = False
changed_detected = False
contents = b ' '
contents = b ' '
process_changedetection_results = True
process_changedetection_results = True
@ -252,7 +321,8 @@ class update_worker(threading.Thread):
watch = self . datastore . data [ ' watching ' ] . get ( uuid )
watch = self . datastore . data [ ' watching ' ] . get ( uuid )
logger . info ( f " Processing watch UUID { uuid } Priority { queued_item_data . priority } URL { watch [ ' url ' ] } " )
logger . info ( f " Processing watch UUID { uuid } Priority { queued_item_data . priority } URL { watch [
' url ' ] } " )
now = time . time ( )
now = time . time ( )
try :
try :
@ -261,7 +331,6 @@ class update_worker(threading.Thread):
# Abort processing when the content was the same as the last fetch
# Abort processing when the content was the same as the last fetch
skip_when_same_checksum = queued_item_data . item . get ( ' skip_when_checksum_same ' )
skip_when_same_checksum = queued_item_data . item . get ( ' skip_when_checksum_same ' )
# Init a new 'difference_detection_processor', first look in processors
# Init a new 'difference_detection_processor', first look in processors
processor_module_name = f " changedetectionio.processors. { processor } .processor "
processor_module_name = f " changedetectionio.processors. { processor } .processor "
try :
try :
@ -314,16 +383,16 @@ class update_worker(threading.Thread):
else :
else :
extra_help = " , it ' s possible that the filters were found, but contained no usable text. "
extra_help = " , it ' s possible that the filters were found, but contained no usable text. "
self . datastore. update_watch( uuid = uuid , update_obj = {
self . _ update_watch( uuid = uuid , update_obj = {
' last_error ' : f " Got HTML content but no text found (With { e . status_code } reply code) { extra_help } "
' last_error ' : f " Got HTML content but no text found (With { e . status_code } reply code) { extra_help } "
} )
} , exception = e )
if e . screenshot :
if e . screenshot :
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
if e . xpath_data :
if e . xpath_data :
watch . save_xpath_data ( data = e . xpath_data )
watch . save_xpath_data ( data = e . xpath_data )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . Non200ErrorCodeReceived as e :
except content_fetchers_exceptions . Non200ErrorCodeReceived as e :
@ -345,7 +414,7 @@ class update_worker(threading.Thread):
if e . page_text :
if e . page_text :
watch . save_error_text ( contents = e . page_text )
watch . save_error_text ( contents = e . page_text )
self . datastore. update_watch( uuid = uuid , update_obj = { ' last_error ' : err_text } )
self . _ update_watch( uuid = uuid , update_obj = { ' last_error ' : err_text } , exception = e )
process_changedetection_results = False
process_changedetection_results = False
except FilterNotFoundInResponse as e :
except FilterNotFoundInResponse as e :
@ -353,7 +422,7 @@ class update_worker(threading.Thread):
continue
continue
err_text = " Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary. "
err_text = " Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary. "
self . datastore. update_watch( uuid = uuid , update_obj = { ' last_error ' : err_text } )
self . _ update_watch( uuid = uuid , update_obj = { ' last_error ' : err_text } , exception = None )
# Filter wasnt found, but we should still update the visual selector so that they can have a chance to set it up again
# Filter wasnt found, but we should still update the visual selector so that they can have a chance to set it up again
if e . screenshot :
if e . screenshot :
@ -375,7 +444,7 @@ class update_worker(threading.Thread):
self . send_filter_failure_notification ( uuid )
self . send_filter_failure_notification ( uuid )
c = 0
c = 0
self . datastore. update_watch( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
self . _ update_watch( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } , exception = None )
process_changedetection_results = False
process_changedetection_results = False
@ -384,16 +453,16 @@ class update_worker(threading.Thread):
process_changedetection_results = False
process_changedetection_results = False
changed_detected = False
changed_detected = False
except content_fetchers_exceptions . BrowserConnectError as e :
except content_fetchers_exceptions . BrowserConnectError as e :
self . datastore. update_watch( uuid = uuid ,
self . _ update_watch( uuid = uuid ,
update_obj = { ' last_error ' : e . msg
update_obj = { ' last_error ' : e . msg
}
}
)
, exception = e )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . BrowserFetchTimedOut as e :
except content_fetchers_exceptions . BrowserFetchTimedOut as e :
self . datastore. update_watch( uuid = uuid ,
self . _ update_watch( uuid = uuid ,
update_obj = { ' last_error ' : e . msg
update_obj = { ' last_error ' : e . msg
}
}
)
, exception = e )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . BrowserStepsStepException as e :
except content_fetchers_exceptions . BrowserStepsStepException as e :
@ -415,11 +484,11 @@ class update_worker(threading.Thread):
logger . debug ( f " BrowserSteps exception at step { error_step } { str ( e . original_e ) } " )
logger . debug ( f " BrowserSteps exception at step { error_step } { str ( e . original_e ) } " )
self . datastore. update_watch( uuid = uuid ,
self . _ update_watch( uuid = uuid ,
update_obj = { ' last_error ' : err_text ,
update_obj = { ' last_error ' : err_text ,
' browser_steps_last_error_step ' : error_step
' browser_steps_last_error_step ' : error_step
}
}
)
, exception = None )
if watch . get ( ' filter_failure_notification_send ' , False ) :
if watch . get ( ' filter_failure_notification_send ' , False ) :
c = watch . get ( ' consecutive_filter_failures ' , 5 )
c = watch . get ( ' consecutive_filter_failures ' , 5 )
@ -433,27 +502,31 @@ class update_worker(threading.Thread):
self . send_step_failure_notification ( watch_uuid = uuid , step_n = e . step_n )
self . send_step_failure_notification ( watch_uuid = uuid , step_n = e . step_n )
c = 0
c = 0
self . datastore. update_watch( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } )
self . _ update_watch( uuid = uuid , update_obj = { ' consecutive_filter_failures ' : c } , exception = None )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . EmptyReply as e :
except content_fetchers_exceptions . EmptyReply as e :
# Some kind of custom to-str handler in the exception handler that does this?
# Some kind of custom to-str handler in the exception handler that does this?
err_text = " EmptyReply - try increasing ' Wait seconds before extracting text ' , Status Code {} " . format ( e . status_code )
err_text = " EmptyReply - try increasing ' Wait seconds before extracting text ' , Status Code {} " . format (
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
e . status_code )
' last_check_status ' : e . status_code } )
self . _update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } ,
exception = e )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . ScreenshotUnavailable as e :
except content_fetchers_exceptions . ScreenshotUnavailable as e :
err_text = " Screenshot unavailable, page did not render fully in the expected time or page was too long - try increasing ' Wait seconds before extracting text ' "
err_text = " Screenshot unavailable, page did not render fully in the expected time or page was too long - try increasing ' Wait seconds before extracting text ' "
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . _update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
' last_check_status ' : e . status_code } ,
exception = e )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . JSActionExceptions as e :
except content_fetchers_exceptions . JSActionExceptions as e :
err_text = " Error running JS Actions - Page request - " + e . message
err_text = " Error running JS Actions - Page request - " + e . message
if e . screenshot :
if e . screenshot :
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . _update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code } )
' last_check_status ' : e . status_code } ,
exception = e )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . PageUnloadable as e :
except content_fetchers_exceptions . PageUnloadable as e :
err_text = " Page request from server didnt respond correctly "
err_text = " Page request from server didnt respond correctly "
@ -463,20 +536,22 @@ class update_worker(threading.Thread):
if e . screenshot :
if e . screenshot :
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
watch . save_screenshot ( screenshot = e . screenshot , as_error = True )
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
self . _update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text ,
' last_check_status ' : e . status_code ,
' last_check_status ' : e . status_code ,
' has_ldjson_price_data ' : None } )
' has_ldjson_price_data ' : None } ,
exception = e )
process_changedetection_results = False
process_changedetection_results = False
except content_fetchers_exceptions . BrowserStepsInUnsupportedFetcher as e :
except content_fetchers_exceptions . BrowserStepsInUnsupportedFetcher as e :
err_text = " This watch has Browser Steps configured and so it cannot run with the ' Basic fast Plaintext/HTTP Client ' , either remove the Browser Steps or select a Chrome fetcher. "
err_text = " This watch has Browser Steps configured and so it cannot run with the ' Basic fast Plaintext/HTTP Client ' , either remove the Browser Steps or select a Chrome fetcher. "
self . datastore . update_watch ( uuid = uuid , update_obj = { ' last_error ' : err_text } )
self . _update_watch ( uuid = uuid , update_obj = {
' last_error ' : err_text } , exception = e )
process_changedetection_results = False
process_changedetection_results = False
logger . error ( f " Exception (BrowserStepsInUnsupportedFetcher) reached processing watch UUID: { uuid } " )
logger . error ( f " Exception (BrowserStepsInUnsupportedFetcher) reached processing watch UUID: { uuid } " )
except Exception as e :
except Exception as e :
logger . error ( f " Exception reached processing watch UUID: { uuid } " )
logger . error ( f " Exception reached processing watch UUID: { uuid } " )
logger . error ( str ( e ) )
logger . error ( str ( e ) )
self . datastore. update_watch( uuid = uuid , update_obj = { ' last_error ' : " Exception: " + str ( e ) } )
self . _ update_watch( uuid = uuid , update_obj = { ' last_error ' : " Exception: " + str ( e ) } , exception = e )
# Other serious error
# Other serious error
process_changedetection_results = False
process_changedetection_results = False
@ -512,7 +587,7 @@ class update_worker(threading.Thread):
# Now update after running everything
# Now update after running everything
timestamp = round ( time . time ( ) )
timestamp = round ( time . time ( ) )
try :
try :
self . datastore. update_watch( uuid = uuid , update_obj = update_obj )
self . _ update_watch( uuid = uuid , update_obj = update_obj , exception = None )
# Also save the snapshot on the first time checked, "last checked" will always be updated, so we just check history length.
# Also save the snapshot on the first time checked, "last checked" will always be updated, so we just check history length.
@ -546,11 +621,12 @@ class update_worker(threading.Thread):
if not watch . get ( ' notification_muted ' ) :
if not watch . get ( ' notification_muted ' ) :
self . send_content_changed_notification ( watch_uuid = uuid )
self . send_content_changed_notification ( watch_uuid = uuid )
except Exception as e :
except Exception as e :
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
logger . critical ( " !!!! Exception in update_worker while processing process_changedetection_results !!! " )
logger . critical ( " !!!! Exception in update_worker while processing process_changedetection_results !!! " )
logger . critical ( str ( e ) )
logger . critical ( str ( e ) )
self . datastore. update_watch( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } )
self . _ update_watch( uuid = uuid , update_obj = { ' last_error ' : str ( e ) } , exception = e )
# Always record that we atleast tried
# Always record that we atleast tried
@ -565,10 +641,10 @@ class update_worker(threading.Thread):
except Exception as e :
except Exception as e :
pass
pass
self . datastore. update_watch( uuid = uuid , update_obj = { ' fetch_time ' : round ( time . time ( ) - now , 3 ) ,
self . _ update_watch( uuid = uuid , update_obj = { ' fetch_time ' : round ( time . time ( ) - now , 3 ) ,
' last_checked ' : round ( time . time ( ) ) ,
' last_checked ' : round ( time . time ( ) ) ,
' check_count ' : count
' check_count ' : count
} )
} , exception = None if process_changedetection_results else Exception ( " Unknown " ) )
self . current_uuid = None # Done
self . current_uuid = None # Done