# HTML to TEXT/JSON DIFFERENCE FETCHER import hashlib import json import logging import os import re import urllib3 from changedetectionio import content_fetcher, html_tools from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from copy import deepcopy from . import difference_detection_processor from ..html_tools import PERL_STYLE_REGEX urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) name = 'Webpage Text/HTML, JSON and PDF changes' description = 'Detects all text changes where possible' json_filter_prefixes = ['json:', 'jq:'] class FilterNotFoundInResponse(ValueError): def __init__(self, msg): ValueError.__init__(self, msg) class PDFToHTMLToolNotFound(ValueError): def __init__(self, msg): ValueError.__init__(self, msg) # Some common stuff here that can be moved to a base class # (set_proxy_from_list) class perform_site_check(difference_detection_processor): screenshot = None xpath_data = None def __init__(self, *args, datastore, **kwargs): super().__init__(*args, **kwargs) self.datastore = datastore def run(self, uuid, skip_when_checksum_same=True, preferred_proxy=None): changed_detected = False screenshot = False # as bytes stripped_text_from_html = "" # DeepCopy so we can be sure we don't accidently change anything by reference watch = deepcopy(self.datastore.data['watching'].get(uuid)) if not watch: raise Exception("Watch no longer exists.") # Protect against file:// access if re.search(r'^file', watch.get('url', ''), re.IGNORECASE) and not os.getenv('ALLOW_FILE_URI', False): raise Exception( "file:// type access is denied for security reasons." ) # Unset any existing notification error update_obj = {'last_notification_error': False, 'last_error': False} # Tweak the base config with the per-watch ones request_headers = watch.get('headers', []) request_headers.update(self.datastore.get_all_base_headers()) request_headers.update(self.datastore.get_all_headers_in_textfile_for_watch(uuid=uuid)) # https://github.com/psf/requests/issues/4525 # Requests doesnt yet support brotli encoding, so don't put 'br' here, be totally sure that the user cannot # do this by accident. if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']: request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '') timeout = self.datastore.data['settings']['requests'].get('timeout') url = watch.link request_body = self.datastore.data['watching'][uuid].get('body') request_method = self.datastore.data['watching'][uuid].get('method') ignore_status_codes = self.datastore.data['watching'][uuid].get('ignore_status_codes', False) # source: support is_source = False if url.startswith('source:'): url = url.replace('source:', '') is_source = True # Pluggable content fetcher prefer_backend = watch.get_fetch_backend if not prefer_backend or prefer_backend == 'system': prefer_backend = self.datastore.data['settings']['application']['fetch_backend'] if hasattr(content_fetcher, prefer_backend): klass = getattr(content_fetcher, prefer_backend) else: # If the klass doesnt exist, just use a default klass = getattr(content_fetcher, "html_requests") if preferred_proxy: proxy_id = preferred_proxy else: proxy_id = self.datastore.get_preferred_proxy_for_watch(uuid=uuid) proxy_url = None if proxy_id: proxy_url = self.datastore.proxy_list.get(proxy_id).get('url') print("UUID {} Using proxy {}".format(uuid, proxy_url)) fetcher = klass(proxy_override=proxy_url) # Configurable per-watch or global extra delay before extracting text (for webDriver types) system_webdriver_delay = self.datastore.data['settings']['application'].get('webdriver_delay', None) if watch['webdriver_delay'] is not None: fetcher.render_extract_delay = watch.get('webdriver_delay') elif system_webdriver_delay is not None: fetcher.render_extract_delay = system_webdriver_delay # Possible conflict if prefer_backend == 'html_webdriver': fetcher.browser_steps = watch.get('browser_steps', None) fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, uuid) if watch.get('webdriver_js_execute_code') is not None and watch.get('webdriver_js_execute_code').strip(): fetcher.webdriver_js_execute_code = watch.get('webdriver_js_execute_code') # requests for PDF's, images etc should be passwd the is_binary flag is_binary = watch.is_pdf fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_codes, watch.get('include_filters'), is_binary=is_binary) fetcher.quit() self.screenshot = fetcher.screenshot self.xpath_data = fetcher.xpath_data # Track the content type update_obj['content_type'] = fetcher.get_all_headers().get('content-type', '').lower() # Watches added automatically in the queue manager will skip if its the same checksum as the previous run # Saves a lot of CPU update_obj['previous_md5_before_filters'] = hashlib.md5(fetcher.content.encode('utf-8')).hexdigest() if skip_when_checksum_same: if update_obj['previous_md5_before_filters'] == watch.get('previous_md5_before_filters'): raise content_fetcher.checksumFromPreviousCheckWasTheSame() # Fetching complete, now filters # @todo move to class / maybe inside of fetcher abstract base? # @note: I feel like the following should be in a more obvious chain system # - Check filter text # - Is the checksum different? # - Do we convert to JSON? # https://stackoverflow.com/questions/41817578/basic-method-chaining ? # return content().textfilter().jsonextract().checksumcompare() ? is_json = 'application/json' in fetcher.get_all_headers().get('content-type', '').lower() is_html = not is_json # source: support, basically treat it as plaintext if is_source: is_html = False is_json = False if watch.is_pdf or 'application/pdf' in fetcher.get_all_headers().get('content-type', '').lower(): from shutil import which tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml") if not which(tool): raise PDFToHTMLToolNotFound("Command-line `{}` tool was not found in system PATH, was it installed?".format(tool)) import subprocess proc = subprocess.Popen( [tool, '-stdout', '-', '-s', 'out.pdf', '-i'], stdout=subprocess.PIPE, stdin=subprocess.PIPE) proc.stdin.write(fetcher.raw_content) proc.stdin.close() fetcher.content = proc.stdout.read().decode('utf-8') proc.wait(timeout=60) # Add a little metadata so we know if the file changes (like if an image changes, but the text is the same # @todo may cause problems with non-UTF8? metadata = "
Added by changedetection.io: Document checksum - {} Filesize - {} bytes
".format( hashlib.md5(fetcher.raw_content).hexdigest().upper(), len(fetcher.content)) fetcher.content = fetcher.content.replace('