Use a thread locker and cleaner separation of concerns between main thread and site status fetch

pull/11/head 0.23
Leigh Morresi 4 years ago
parent ef2dd44e7e
commit 5e31ae86d0

@ -33,7 +33,6 @@ from flask import Flask, render_template, request, send_file, send_from_director
# Local # Local
import store import store
import fetch_site_status
running_update_threads = [] running_update_threads = []
ticker_thread = None ticker_thread = None
@ -400,14 +399,18 @@ class Worker(threading.Thread):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
def run(self): def run(self):
import fetch_site_status
try: try:
while True: while True:
uuid = self.q.get() # Blocking uuid = self.q.get() # Blocking
self.current_uuid = uuid self.current_uuid = uuid
# A little safety protection
if uuid in list( datastore.data['watching'].keys()): if uuid in list(datastore.data['watching'].keys()):
fetch_site_status.perform_site_check(uuid=uuid, datastore=datastore) update_handler = fetch_site_status.perform_site_check(uuid=uuid, datastore=datastore)
self.current_uuid = None # Done datastore.update_watch(uuid=uuid, update_obj=update_handler.update_data)
self.current_uuid = None # Done
self.q.task_done() self.q.task_done()
except KeyboardInterrupt: except KeyboardInterrupt:

@ -5,11 +5,14 @@ import os
import re import re
from inscriptis import get_text from inscriptis import get_text
# Doesn't feel right having 'datastore' as a var here, perhaps this class can inherit from datastore/abstract # Some common stuff here that can be moved to a base class
# but on the other hand, I dont want a new instantiation of the that datastore object every time, due to it reading the
# JSON store, setting vars, writing etc.
class perform_site_check(): class perform_site_check():
# New state that is set after a check
# Return value dict
update_obj = {}
def __init__(self, *args, uuid=False, datastore, **kwargs): def __init__(self, *args, uuid=False, datastore, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.timestamp = int(time.time()) # used for storage etc too self.timestamp = int(time.time()) # used for storage etc too
@ -22,6 +25,11 @@ class perform_site_check():
self.ensure_output_path() self.ensure_output_path()
self.run() self.run()
# Current state of what needs to be updated
@property
def update_data(self):
return self.update_obj
def save_firefox_screenshot(self, uuid, output): def save_firefox_screenshot(self, uuid, output):
# @todo call selenium or whatever # @todo call selenium or whatever
return return
@ -62,10 +70,6 @@ class perform_site_check():
if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']: if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']:
request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '') request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '')
print("Checking", self.url)
try: try:
timeout = self.datastore.data['settings']['requests']['timeout'] timeout = self.datastore.data['settings']['requests']['timeout']
except KeyError: except KeyError:
@ -80,9 +84,12 @@ class perform_site_check():
stripped_text_from_html = get_text(r.text) stripped_text_from_html = get_text(r.text)
# Usually from networkIO/requests level # Usually from networkIO/requests level
except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e: except (requests.exceptions.ConnectionError, requests.exceptions.ReadTimeout) as e:
self.datastore.update_watch(self.uuid, 'last_error', str(e)) self.update_obj["last_error"] = str(e)
print(str(e)) print(str(e))
except requests.exceptions.MissingSchema: except requests.exceptions.MissingSchema:
@ -90,35 +97,36 @@ class perform_site_check():
# Usually from html2text level # Usually from html2text level
except UnicodeDecodeError as e: except UnicodeDecodeError as e:
self.datastore.update_watch(self.uuid, 'last_error', str(e))
self.update_obj["last_error"] = str(e)
print(str(e)) print(str(e))
# figure out how to deal with this cleaner.. # figure out how to deal with this cleaner..
# 'utf-8' codec can't decode byte 0xe9 in position 480: invalid continuation byte # 'utf-8' codec can't decode byte 0xe9 in position 480: invalid continuation byte
else: else:
# We rely on the actual text in the html output.. many sites have random script vars etc,
# in the future we'll implement other mechanisms.
# We rely on the actual text in the html output.. many sites have random script vars etc self.update_obj["last_check_status"] = r.status_code
self.datastore.update_watch(self.uuid, 'last_error', False) self.update_obj["last_error"] = False
self.datastore.update_watch(self.uuid, 'last_check_status', r.status_code)
fetched_md5 = hashlib.md5(stripped_text_from_html.encode('utf-8')).hexdigest() fetched_md5 = hashlib.md5(stripped_text_from_html.encode('utf-8')).hexdigest()
if self.current_md5 != fetched_md5: if self.current_md5 != fetched_md5:
# Dont confuse people by putting last-changed, when it actually just changed from nothing.. # Don't confuse people by updating as last-changed, when it actually just changed from None..
if self.datastore.get_val(self.uuid, 'previous_md5') is not None: if self.datastore.get_val(self.uuid, 'previous_md5') is not None:
self.datastore.update_watch(self.uuid, 'last_changed', self.timestamp) self.update_obj["last_changed"] = self.timestamp
self.update_obj["previous_md5"] = fetched_md5
self.datastore.update_watch(self.uuid, 'previous_md5', fetched_md5)
self.save_response_html_output(r.text) self.save_response_html_output(r.text)
output_filepath = self.save_response_stripped_output(stripped_text_from_html) output_filepath = self.save_response_stripped_output(stripped_text_from_html)
# Update history with the stripped text for future reference, this will also mean we save the first # Update history with the stripped text for future reference, this will also mean we save the first
# attempt because 'self.current_md5 != fetched_md5' (current_md5 will be None when not run) timestamp = str(self.timestamp)
# need to learn more about attr/setters/getters self.update_obj.update({"history": {timestamp: output_filepath}})
history = self.datastore.get_val(self.uuid, 'history')
history.update(dict([(str(self.timestamp), output_filepath)])) self.update_obj["last_checked"] = self.timestamp
self.datastore.update_watch(self.uuid, 'history', history)
self.datastore.update_watch(self.uuid, 'last_checked', int(time.time()))
pass

@ -3,35 +3,37 @@ import uuid as uuid_builder
import validators import validators
import os.path import os.path
from os import path from os import path
from threading import Lock, Thread
# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods? # Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods?
# Open a github issue if you know something :) # Open a github issue if you know something :)
# https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change # https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change
class ChangeDetectionStore: class ChangeDetectionStore:
lock = Lock()
def __init__(self): def __init__(self):
self.needs_write = False self.needs_write = False
self.__data = { self.__data = {
'note' : "Hello! If you change this file manually, please be sure to restart your changedetection.io instance!", 'note': "Hello! If you change this file manually, please be sure to restart your changedetection.io instance!",
'watching': {}, 'watching': {},
'tag': "0.22", 'tag': "0.23",
'settings': { 'settings': {
'headers': { 'headers': {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.66 Safari/537.36', 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.66 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding': 'gzip, deflate', # No support for brolti in python requests yet. 'Accept-Encoding': 'gzip, deflate', # No support for brolti in python requests yet.
'Accept-Language': 'en-GB,en-US;q=0.9,en;' 'Accept-Language': 'en-GB,en-US;q=0.9,en;'
}, },
'requests': { 'requests': {
'timeout': 15, # Default 15 seconds 'timeout': 15, # Default 15 seconds
'minutes_between_check': 3 * 60, # Default 3 hours 'minutes_between_check': 3 * 60, # Default 3 hours
'workers': 10 # Number of threads, lower is better for slow connections 'workers': 10 # Number of threads, lower is better for slow connections
} }
} }
} }
# Base definition for all watchers # Base definition for all watchers
self.generic_definition = { self.generic_definition = {
'url': None, 'url': None,
@ -41,8 +43,8 @@ class ChangeDetectionStore:
'title': None, 'title': None,
'previous_md5': None, 'previous_md5': None,
'uuid': str(uuid_builder.uuid4()), 'uuid': str(uuid_builder.uuid4()),
'headers' : {}, # Extra headers to send 'headers': {}, # Extra headers to send
'history' : {} # Dict of timestamp and output stripped filename 'history': {} # Dict of timestamp and output stripped filename
} }
if path.isfile('/source.txt'): if path.isfile('/source.txt'):
@ -67,7 +69,6 @@ class ChangeDetectionStore:
if 'requests' in from_disk['settings']: if 'requests' in from_disk['settings']:
self.__data['settings']['requests'].update(from_disk['settings']['requests']) self.__data['settings']['requests'].update(from_disk['settings']['requests'])
# Reinitialise each `watching` with our generic_definition in the case that we add a new var in the future. # Reinitialise each `watching` with our generic_definition in the case that we add a new var in the future.
# @todo pretty sure theres a python we todo this with an abstracted(?) object! # @todo pretty sure theres a python we todo this with an abstracted(?) object!
i = 0 i = 0
@ -85,20 +86,28 @@ class ChangeDetectionStore:
self.add_watch(url='https://www.gov.uk/coronavirus', tag='Covid') self.add_watch(url='https://www.gov.uk/coronavirus', tag='Covid')
self.add_watch(url='https://changedetection.io', tag='Tech news') self.add_watch(url='https://changedetection.io', tag='Tech news')
def update_watch(self, uuid, update_obj):
# self.entryVariable.get()
def update_watch(self, uuid, val, var):
self.__data['watching'][uuid].update({val: var}) self.lock.acquire()
self.needs_write = True
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
for dict_key, d in self.generic_definition.items():
if isinstance(d, dict) and dict_key in update_obj:
self.__data['watching'][uuid][dict_key].update(update_obj[dict_key])
del(update_obj[dict_key])
# Update with the remaining values
self.__data['watching'][uuid].update(update_obj)
self.needs_write = True
self.lock.release()
@property @property
def data(self): def data(self):
return self.__data return self.__data
def get_all_tags(self): def get_all_tags(self):
tags=[] tags = []
for uuid, watch in self.data['watching'].items(): for uuid, watch in self.data['watching'].items():
# Support for comma separated list of tags. # Support for comma separated list of tags.
@ -111,10 +120,11 @@ class ChangeDetectionStore:
return tags return tags
def delete(self, uuid): def delete(self, uuid):
# Probably their should be dict...
del(self.__data['watching'][uuid])
self.needs_write = True
self.lock.acquire()
del (self.__data['watching'][uuid])
self.needs_write = True
self.lock.release()
def url_exists(self, url): def url_exists(self, url):
@ -130,7 +140,7 @@ class ChangeDetectionStore:
return self.data['watching'][uuid].get(val) return self.data['watching'][uuid].get(val)
def add_watch(self, url, tag): def add_watch(self, url, tag):
self.lock.acquire()
print("Adding", url, tag) print("Adding", url, tag)
# # @todo deal with exception # # @todo deal with exception
# validators.url(url) # validators.url(url)
@ -146,13 +156,15 @@ class ChangeDetectionStore:
self.data['watching'][new_uuid] = _blank self.data['watching'][new_uuid] = _blank
self.needs_write = True self.needs_write = True
self.lock.release()
return new_uuid return new_uuid
def sync_to_json(self): def sync_to_json(self):
print ("Saving index") print("Saving index")
self.lock.acquire()
with open('/datastore/url-watches.json', 'w') as json_file: with open('/datastore/url-watches.json', 'w') as json_file:
json.dump(self.data, json_file, indent=4) json.dump(self.data, json_file, indent=4)
self.needs_write = False self.needs_write = False
self.lock.release()
# body of the constructor # body of the constructor

Loading…
Cancel
Save