You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

251 lines
8.5 KiB

"""Helpers."""
import errno
import hashlib
import json
import logging
import os
import os.path
import sys
from hashlib import md5
from typing import (
Callable,
Dict,
Hashable,
Iterable,
Optional,
TypeVar,
Union,
cast,
)
from filelock import FileLock
import requests
LOG = logging.getLogger(__name__)
_DID_LOG_UNABLE_TO_CACHE = False
T = TypeVar("T") # pylint: disable=invalid-name
def get_pkg_unique_identifier() -> str:
"""
Generate an identifier unique to the python version, tldextract version, and python instance.
This will prevent interference between virtualenvs and issues that might arise when installing
a new version of tldextract
"""
try:
# pylint: disable=import-outside-toplevel
from tldextract._version import version
except ImportError:
version = "dev"
tldextract_version = "tldextract-" + version
python_env_name = os.path.basename(sys.prefix)
# just to handle the edge case of two identically named python environments
python_binary_path_short_hash = hashlib.md5(sys.prefix.encode("utf-8")).hexdigest()[
:6
]
python_version = ".".join([str(v) for v in sys.version_info[:-1]])
identifier_parts = [
python_version,
python_env_name,
python_binary_path_short_hash,
tldextract_version,
]
pkg_identifier = "__".join(identifier_parts)
return pkg_identifier
def get_cache_dir() -> str:
"""
Get a cache dir that we have permission to write to.
Try to follow the XDG standard, but if that doesn't work fallback to the package directory
http://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
"""
cache_dir = os.environ.get("TLDEXTRACT_CACHE", None)
if cache_dir is not None:
return cache_dir
xdg_cache_home = os.getenv("XDG_CACHE_HOME", None)
if xdg_cache_home is None:
user_home = os.getenv("HOME", None)
if user_home:
xdg_cache_home = os.path.join(user_home, ".cache")
if xdg_cache_home is not None:
return os.path.join(
xdg_cache_home, "python-tldextract", get_pkg_unique_identifier()
)
# fallback to trying to use package directory itself
return os.path.join(os.path.dirname(__file__), ".suffix_cache/")
class DiskCache:
"""Disk _cache that only works for jsonable values."""
def __init__(self, cache_dir: Optional[str], lock_timeout: int = 20):
self.enabled = bool(cache_dir)
self.cache_dir = os.path.expanduser(str(cache_dir) or "")
self.lock_timeout = lock_timeout
# using a unique extension provides some safety that an incorrectly set cache_dir
# combined with a call to `.clear()` wont wipe someones hard drive
self.file_ext = ".tldextract.json"
def get(self, namespace: str, key: Union[str, Dict[str, Hashable]]) -> object:
"""Retrieve a value from the disk cache"""
if not self.enabled:
raise KeyError("Cache is disabled")
cache_filepath = self._key_to_cachefile_path(namespace, key)
if not os.path.isfile(cache_filepath):
raise KeyError("namespace: " + namespace + " key: " + repr(key))
try:
# pylint: disable-next=unspecified-encoding
with open(cache_filepath) as cache_file:
return json.load(cache_file)
except (OSError, ValueError) as exc:
LOG.error("error reading TLD cache file %s: %s", cache_filepath, exc)
raise KeyError("namespace: " + namespace + " key: " + repr(key)) from None
def set(
self, namespace: str, key: Union[str, Dict[str, Hashable]], value: object
) -> None:
"""Set a value in the disk cache."""
if not self.enabled:
return
cache_filepath = self._key_to_cachefile_path(namespace, key)
try:
_make_dir(cache_filepath)
# pylint: disable-next=unspecified-encoding
with open(cache_filepath, "w") as cache_file:
json.dump(value, cache_file)
except OSError as ioe:
global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement
if not _DID_LOG_UNABLE_TO_CACHE:
LOG.warning(
"unable to cache %s.%s in %s. This could refresh the "
"Public Suffix List over HTTP every app startup. "
"Construct your `TLDExtract` with a writable `cache_dir` or "
"set `cache_dir=None` to silence this warning. %s",
namespace,
key,
cache_filepath,
ioe,
)
_DID_LOG_UNABLE_TO_CACHE = True
def clear(self) -> None:
"""Clear the disk cache."""
for root, _, files in os.walk(self.cache_dir):
for filename in files:
if filename.endswith(self.file_ext) or filename.endswith(
self.file_ext + ".lock"
):
try:
os.unlink(os.path.join(root, filename))
except FileNotFoundError:
pass
except OSError as exc:
# errno.ENOENT == "No such file or directory"
# https://docs.python.org/2/library/errno.html#errno.ENOENT
if exc.errno != errno.ENOENT:
raise
def _key_to_cachefile_path(
self, namespace: str, key: Union[str, Dict[str, Hashable]]
) -> str:
namespace_path = os.path.join(self.cache_dir, namespace)
hashed_key = _make_cache_key(key)
cache_path = os.path.join(namespace_path, hashed_key + self.file_ext)
return cache_path
def run_and_cache(
self,
func: Callable[..., T],
namespace: str,
kwargs: Dict[str, Hashable],
hashed_argnames: Iterable[str],
) -> T:
"""Get a url but cache the response."""
if not self.enabled:
return func(**kwargs)
key_args = {k: v for k, v in kwargs.items() if k in hashed_argnames}
cache_filepath = self._key_to_cachefile_path(namespace, key_args)
lock_path = cache_filepath + ".lock"
try:
_make_dir(cache_filepath)
except OSError as ioe:
global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement
if not _DID_LOG_UNABLE_TO_CACHE:
LOG.warning(
"unable to cache %s.%s in %s. This could refresh the "
"Public Suffix List over HTTP every app startup. "
"Construct your `TLDExtract` with a writable `cache_dir` or "
"set `cache_dir=None` to silence this warning. %s",
namespace,
key_args,
cache_filepath,
ioe,
)
_DID_LOG_UNABLE_TO_CACHE = True
return func(**kwargs)
# Disable lint of 3rd party (see also https://github.com/tox-dev/py-filelock/issues/102)
# pylint: disable-next=abstract-class-instantiated
with FileLock(lock_path, timeout=self.lock_timeout):
try:
result = cast(T, self.get(namespace=namespace, key=key_args))
except KeyError:
result = func(**kwargs)
self.set(namespace=namespace, key=key_args, value=result)
return result
def cached_fetch_url(
self, session: requests.Session, url: str, timeout: Union[float, int, None]
) -> str:
"""Get a url but cache the response."""
return self.run_and_cache(
func=_fetch_url,
namespace="urls",
kwargs={"session": session, "url": url, "timeout": timeout},
hashed_argnames=["url"],
)
def _fetch_url(session: requests.Session, url: str, timeout: Optional[int]) -> str:
response = session.get(url, timeout=timeout)
response.raise_for_status()
text = response.text
if not isinstance(text, str):
text = str(text, "utf-8")
return text
def _make_cache_key(inputs: Union[str, Dict[str, Hashable]]) -> str:
key = repr(inputs)
return md5(key.encode("utf8")).hexdigest()
def _make_dir(filename: str) -> None:
"""Make a directory if it doesn't already exist."""
if not os.path.exists(os.path.dirname(filename)):
try:
os.makedirs(os.path.dirname(filename))
except OSError as exc: # Guard against race condition
if exc.errno != errno.EEXIST:
raise