"""Helpers.""" import errno import hashlib import json import logging import os import os.path import sys from hashlib import md5 from typing import ( Callable, Dict, Hashable, Iterable, Optional, TypeVar, Union, cast, ) from filelock import FileLock import requests LOG = logging.getLogger(__name__) _DID_LOG_UNABLE_TO_CACHE = False T = TypeVar("T") # pylint: disable=invalid-name def get_pkg_unique_identifier() -> str: """ Generate an identifier unique to the python version, tldextract version, and python instance. This will prevent interference between virtualenvs and issues that might arise when installing a new version of tldextract """ try: # pylint: disable=import-outside-toplevel from tldextract._version import version except ImportError: version = "dev" tldextract_version = "tldextract-" + version python_env_name = os.path.basename(sys.prefix) # just to handle the edge case of two identically named python environments python_binary_path_short_hash = hashlib.md5(sys.prefix.encode("utf-8")).hexdigest()[ :6 ] python_version = ".".join([str(v) for v in sys.version_info[:-1]]) identifier_parts = [ python_version, python_env_name, python_binary_path_short_hash, tldextract_version, ] pkg_identifier = "__".join(identifier_parts) return pkg_identifier def get_cache_dir() -> str: """ Get a cache dir that we have permission to write to. Try to follow the XDG standard, but if that doesn't work fallback to the package directory http://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html """ cache_dir = os.environ.get("TLDEXTRACT_CACHE", None) if cache_dir is not None: return cache_dir xdg_cache_home = os.getenv("XDG_CACHE_HOME", None) if xdg_cache_home is None: user_home = os.getenv("HOME", None) if user_home: xdg_cache_home = os.path.join(user_home, ".cache") if xdg_cache_home is not None: return os.path.join( xdg_cache_home, "python-tldextract", get_pkg_unique_identifier() ) # fallback to trying to use package directory itself return os.path.join(os.path.dirname(__file__), ".suffix_cache/") class DiskCache: """Disk _cache that only works for jsonable values.""" def __init__(self, cache_dir: Optional[str], lock_timeout: int = 20): self.enabled = bool(cache_dir) self.cache_dir = os.path.expanduser(str(cache_dir) or "") self.lock_timeout = lock_timeout # using a unique extension provides some safety that an incorrectly set cache_dir # combined with a call to `.clear()` wont wipe someones hard drive self.file_ext = ".tldextract.json" def get(self, namespace: str, key: Union[str, Dict[str, Hashable]]) -> object: """Retrieve a value from the disk cache""" if not self.enabled: raise KeyError("Cache is disabled") cache_filepath = self._key_to_cachefile_path(namespace, key) if not os.path.isfile(cache_filepath): raise KeyError("namespace: " + namespace + " key: " + repr(key)) try: # pylint: disable-next=unspecified-encoding with open(cache_filepath) as cache_file: return json.load(cache_file) except (OSError, ValueError) as exc: LOG.error("error reading TLD cache file %s: %s", cache_filepath, exc) raise KeyError("namespace: " + namespace + " key: " + repr(key)) from None def set( self, namespace: str, key: Union[str, Dict[str, Hashable]], value: object ) -> None: """Set a value in the disk cache.""" if not self.enabled: return cache_filepath = self._key_to_cachefile_path(namespace, key) try: _make_dir(cache_filepath) # pylint: disable-next=unspecified-encoding with open(cache_filepath, "w") as cache_file: json.dump(value, cache_file) except OSError as ioe: global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement if not _DID_LOG_UNABLE_TO_CACHE: LOG.warning( "unable to cache %s.%s in %s. This could refresh the " "Public Suffix List over HTTP every app startup. " "Construct your `TLDExtract` with a writable `cache_dir` or " "set `cache_dir=None` to silence this warning. %s", namespace, key, cache_filepath, ioe, ) _DID_LOG_UNABLE_TO_CACHE = True def clear(self) -> None: """Clear the disk cache.""" for root, _, files in os.walk(self.cache_dir): for filename in files: if filename.endswith(self.file_ext) or filename.endswith( self.file_ext + ".lock" ): try: os.unlink(os.path.join(root, filename)) except FileNotFoundError: pass except OSError as exc: # errno.ENOENT == "No such file or directory" # https://docs.python.org/2/library/errno.html#errno.ENOENT if exc.errno != errno.ENOENT: raise def _key_to_cachefile_path( self, namespace: str, key: Union[str, Dict[str, Hashable]] ) -> str: namespace_path = os.path.join(self.cache_dir, namespace) hashed_key = _make_cache_key(key) cache_path = os.path.join(namespace_path, hashed_key + self.file_ext) return cache_path def run_and_cache( self, func: Callable[..., T], namespace: str, kwargs: Dict[str, Hashable], hashed_argnames: Iterable[str], ) -> T: """Get a url but cache the response.""" if not self.enabled: return func(**kwargs) key_args = {k: v for k, v in kwargs.items() if k in hashed_argnames} cache_filepath = self._key_to_cachefile_path(namespace, key_args) lock_path = cache_filepath + ".lock" try: _make_dir(cache_filepath) except OSError as ioe: global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement if not _DID_LOG_UNABLE_TO_CACHE: LOG.warning( "unable to cache %s.%s in %s. This could refresh the " "Public Suffix List over HTTP every app startup. " "Construct your `TLDExtract` with a writable `cache_dir` or " "set `cache_dir=None` to silence this warning. %s", namespace, key_args, cache_filepath, ioe, ) _DID_LOG_UNABLE_TO_CACHE = True return func(**kwargs) # Disable lint of 3rd party (see also https://github.com/tox-dev/py-filelock/issues/102) # pylint: disable-next=abstract-class-instantiated with FileLock(lock_path, timeout=self.lock_timeout): try: result = cast(T, self.get(namespace=namespace, key=key_args)) except KeyError: result = func(**kwargs) self.set(namespace=namespace, key=key_args, value=result) return result def cached_fetch_url( self, session: requests.Session, url: str, timeout: Union[float, int, None] ) -> str: """Get a url but cache the response.""" return self.run_and_cache( func=_fetch_url, namespace="urls", kwargs={"session": session, "url": url, "timeout": timeout}, hashed_argnames=["url"], ) def _fetch_url(session: requests.Session, url: str, timeout: Optional[int]) -> str: response = session.get(url, timeout=timeout) response.raise_for_status() text = response.text if not isinstance(text, str): text = str(text, "utf-8") return text def _make_cache_key(inputs: Union[str, Dict[str, Hashable]]) -> str: key = repr(inputs) return md5(key.encode("utf8")).hexdigest() def _make_dir(filename: str) -> None: """Make a directory if it doesn't already exist.""" if not os.path.exists(os.path.dirname(filename)): try: os.makedirs(os.path.dirname(filename)) except OSError as exc: # Guard against race condition if exc.errno != errno.EEXIST: raise