You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
251 lines
8.5 KiB
251 lines
8.5 KiB
"""Helpers."""
|
|
import errno
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import sys
|
|
from hashlib import md5
|
|
from typing import (
|
|
Callable,
|
|
Dict,
|
|
Hashable,
|
|
Iterable,
|
|
Optional,
|
|
TypeVar,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
from filelock import FileLock
|
|
import requests
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
_DID_LOG_UNABLE_TO_CACHE = False
|
|
|
|
T = TypeVar("T") # pylint: disable=invalid-name
|
|
|
|
|
|
def get_pkg_unique_identifier() -> str:
|
|
"""
|
|
Generate an identifier unique to the python version, tldextract version, and python instance.
|
|
|
|
This will prevent interference between virtualenvs and issues that might arise when installing
|
|
a new version of tldextract
|
|
"""
|
|
try:
|
|
# pylint: disable=import-outside-toplevel
|
|
from tldextract._version import version
|
|
except ImportError:
|
|
version = "dev"
|
|
|
|
tldextract_version = "tldextract-" + version
|
|
python_env_name = os.path.basename(sys.prefix)
|
|
# just to handle the edge case of two identically named python environments
|
|
python_binary_path_short_hash = hashlib.md5(sys.prefix.encode("utf-8")).hexdigest()[
|
|
:6
|
|
]
|
|
python_version = ".".join([str(v) for v in sys.version_info[:-1]])
|
|
identifier_parts = [
|
|
python_version,
|
|
python_env_name,
|
|
python_binary_path_short_hash,
|
|
tldextract_version,
|
|
]
|
|
pkg_identifier = "__".join(identifier_parts)
|
|
|
|
return pkg_identifier
|
|
|
|
|
|
def get_cache_dir() -> str:
|
|
"""
|
|
Get a cache dir that we have permission to write to.
|
|
|
|
Try to follow the XDG standard, but if that doesn't work fallback to the package directory
|
|
http://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html
|
|
"""
|
|
cache_dir = os.environ.get("TLDEXTRACT_CACHE", None)
|
|
if cache_dir is not None:
|
|
return cache_dir
|
|
|
|
xdg_cache_home = os.getenv("XDG_CACHE_HOME", None)
|
|
if xdg_cache_home is None:
|
|
user_home = os.getenv("HOME", None)
|
|
if user_home:
|
|
xdg_cache_home = os.path.join(user_home, ".cache")
|
|
|
|
if xdg_cache_home is not None:
|
|
return os.path.join(
|
|
xdg_cache_home, "python-tldextract", get_pkg_unique_identifier()
|
|
)
|
|
|
|
# fallback to trying to use package directory itself
|
|
return os.path.join(os.path.dirname(__file__), ".suffix_cache/")
|
|
|
|
|
|
class DiskCache:
|
|
"""Disk _cache that only works for jsonable values."""
|
|
|
|
def __init__(self, cache_dir: Optional[str], lock_timeout: int = 20):
|
|
self.enabled = bool(cache_dir)
|
|
self.cache_dir = os.path.expanduser(str(cache_dir) or "")
|
|
self.lock_timeout = lock_timeout
|
|
# using a unique extension provides some safety that an incorrectly set cache_dir
|
|
# combined with a call to `.clear()` wont wipe someones hard drive
|
|
self.file_ext = ".tldextract.json"
|
|
|
|
def get(self, namespace: str, key: Union[str, Dict[str, Hashable]]) -> object:
|
|
"""Retrieve a value from the disk cache"""
|
|
if not self.enabled:
|
|
raise KeyError("Cache is disabled")
|
|
cache_filepath = self._key_to_cachefile_path(namespace, key)
|
|
|
|
if not os.path.isfile(cache_filepath):
|
|
raise KeyError("namespace: " + namespace + " key: " + repr(key))
|
|
try:
|
|
# pylint: disable-next=unspecified-encoding
|
|
with open(cache_filepath) as cache_file:
|
|
return json.load(cache_file)
|
|
except (OSError, ValueError) as exc:
|
|
LOG.error("error reading TLD cache file %s: %s", cache_filepath, exc)
|
|
raise KeyError("namespace: " + namespace + " key: " + repr(key)) from None
|
|
|
|
def set(
|
|
self, namespace: str, key: Union[str, Dict[str, Hashable]], value: object
|
|
) -> None:
|
|
"""Set a value in the disk cache."""
|
|
if not self.enabled:
|
|
return
|
|
|
|
cache_filepath = self._key_to_cachefile_path(namespace, key)
|
|
|
|
try:
|
|
_make_dir(cache_filepath)
|
|
# pylint: disable-next=unspecified-encoding
|
|
with open(cache_filepath, "w") as cache_file:
|
|
json.dump(value, cache_file)
|
|
except OSError as ioe:
|
|
global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement
|
|
if not _DID_LOG_UNABLE_TO_CACHE:
|
|
LOG.warning(
|
|
"unable to cache %s.%s in %s. This could refresh the "
|
|
"Public Suffix List over HTTP every app startup. "
|
|
"Construct your `TLDExtract` with a writable `cache_dir` or "
|
|
"set `cache_dir=None` to silence this warning. %s",
|
|
namespace,
|
|
key,
|
|
cache_filepath,
|
|
ioe,
|
|
)
|
|
_DID_LOG_UNABLE_TO_CACHE = True
|
|
|
|
def clear(self) -> None:
|
|
"""Clear the disk cache."""
|
|
for root, _, files in os.walk(self.cache_dir):
|
|
for filename in files:
|
|
if filename.endswith(self.file_ext) or filename.endswith(
|
|
self.file_ext + ".lock"
|
|
):
|
|
try:
|
|
os.unlink(os.path.join(root, filename))
|
|
except FileNotFoundError:
|
|
pass
|
|
except OSError as exc:
|
|
# errno.ENOENT == "No such file or directory"
|
|
# https://docs.python.org/2/library/errno.html#errno.ENOENT
|
|
if exc.errno != errno.ENOENT:
|
|
raise
|
|
|
|
def _key_to_cachefile_path(
|
|
self, namespace: str, key: Union[str, Dict[str, Hashable]]
|
|
) -> str:
|
|
namespace_path = os.path.join(self.cache_dir, namespace)
|
|
hashed_key = _make_cache_key(key)
|
|
|
|
cache_path = os.path.join(namespace_path, hashed_key + self.file_ext)
|
|
|
|
return cache_path
|
|
|
|
def run_and_cache(
|
|
self,
|
|
func: Callable[..., T],
|
|
namespace: str,
|
|
kwargs: Dict[str, Hashable],
|
|
hashed_argnames: Iterable[str],
|
|
) -> T:
|
|
"""Get a url but cache the response."""
|
|
if not self.enabled:
|
|
return func(**kwargs)
|
|
|
|
key_args = {k: v for k, v in kwargs.items() if k in hashed_argnames}
|
|
cache_filepath = self._key_to_cachefile_path(namespace, key_args)
|
|
lock_path = cache_filepath + ".lock"
|
|
try:
|
|
_make_dir(cache_filepath)
|
|
except OSError as ioe:
|
|
global _DID_LOG_UNABLE_TO_CACHE # pylint: disable=global-statement
|
|
if not _DID_LOG_UNABLE_TO_CACHE:
|
|
LOG.warning(
|
|
"unable to cache %s.%s in %s. This could refresh the "
|
|
"Public Suffix List over HTTP every app startup. "
|
|
"Construct your `TLDExtract` with a writable `cache_dir` or "
|
|
"set `cache_dir=None` to silence this warning. %s",
|
|
namespace,
|
|
key_args,
|
|
cache_filepath,
|
|
ioe,
|
|
)
|
|
_DID_LOG_UNABLE_TO_CACHE = True
|
|
|
|
return func(**kwargs)
|
|
|
|
# Disable lint of 3rd party (see also https://github.com/tox-dev/py-filelock/issues/102)
|
|
# pylint: disable-next=abstract-class-instantiated
|
|
with FileLock(lock_path, timeout=self.lock_timeout):
|
|
try:
|
|
result = cast(T, self.get(namespace=namespace, key=key_args))
|
|
except KeyError:
|
|
result = func(**kwargs)
|
|
self.set(namespace=namespace, key=key_args, value=result)
|
|
|
|
return result
|
|
|
|
def cached_fetch_url(
|
|
self, session: requests.Session, url: str, timeout: Union[float, int, None]
|
|
) -> str:
|
|
"""Get a url but cache the response."""
|
|
return self.run_and_cache(
|
|
func=_fetch_url,
|
|
namespace="urls",
|
|
kwargs={"session": session, "url": url, "timeout": timeout},
|
|
hashed_argnames=["url"],
|
|
)
|
|
|
|
|
|
def _fetch_url(session: requests.Session, url: str, timeout: Optional[int]) -> str:
|
|
response = session.get(url, timeout=timeout)
|
|
response.raise_for_status()
|
|
text = response.text
|
|
|
|
if not isinstance(text, str):
|
|
text = str(text, "utf-8")
|
|
|
|
return text
|
|
|
|
|
|
def _make_cache_key(inputs: Union[str, Dict[str, Hashable]]) -> str:
|
|
key = repr(inputs)
|
|
return md5(key.encode("utf8")).hexdigest()
|
|
|
|
|
|
def _make_dir(filename: str) -> None:
|
|
"""Make a directory if it doesn't already exist."""
|
|
if not os.path.exists(os.path.dirname(filename)):
|
|
try:
|
|
os.makedirs(os.path.dirname(filename))
|
|
except OSError as exc: # Guard against race condition
|
|
if exc.errno != errno.EEXIST:
|
|
raise
|