Skip to content

Usage

Most users will only need to call careful.httpx.make_careful_client.

careful.httpx.make_careful_client

This function patches an httpx.Client so that all requests made with the client support retries, throttling, and development caching.

Parameters:

Name Type Description Default
client Client | None

A pre-configured httpx.Client. If omitted a default client will be created.

None
retry_attempts int

Maximum number of retries. If non-zero will retry up to this many times with increasing wait times, starting with retry_wait_seconds.

0
retry_wait_seconds float

Number of seconds to sleep between first attempt and first retry. Subsequent attempts will increase exponentially (2x, 4x, 8x, etc.)

10
should_retry ResponsePredicate

Predicate function that takes a httpx.Response and returns True if it should be retried.

retry_default_rule
requests_per_minute int

Maximum number of requests per minute. (e.g. 30 will throttle to ~2s between requests)

0
cache_storage CacheStorage | None

An object that implements the cache storage interface.

None
cache_write_only bool

Update cache, but never read from it.

False
should_cache ResponsePredicate

Predicate function that takes a httpx.Response and returns True if it should be cached.

_cache_200s
cache_keyfunc CacheKeyfunc

Function that takes request details and returns a unique cache key.

_default_keyfunc
Source code in src/careful/httpx/__init__.py
def make_careful_client(
    *,
    client: Client | None = None,
    retry_attempts: int = 0,
    retry_wait_seconds: float = 10,
    should_retry: ResponsePredicate = retry_default_rule,
    requests_per_minute: int = 0,
    cache_storage: CacheStorage | None = None,
    cache_write_only: bool = False,
    should_cache: ResponsePredicate = _cache_200s,
    cache_keyfunc: CacheKeyfunc = _default_keyfunc,
) -> Client:
    """
    This function patches an `httpx.Client` so that all requests made with the client support
     [retries](#retries), [throttling](#throttling), and [development caching](#development-caching).


    Parameters:
        client: A pre-configured `httpx.Client`. If omitted a default client will be created.

        retry_attempts: Maximum number of retries. If non-zero will retry up to this many times
                         with increasing wait times, starting with `retry_wait_seconds`.

        retry_wait_seconds: Number of seconds to sleep between first attempt and first retry.
                             Subsequent attempts will increase exponentially (2x, 4x, 8x, etc.)

        should_retry: Predicate function that takes a `httpx.Response` and returns `True` if it should be retried.

        requests_per_minute: Maximum number of requests per minute. (e.g. 30 will throttle to ~2s between requests)

        cache_storage: An object that implements the [cache storage interface](#cache-storage).

        cache_write_only: Update cache, but never read from it.

        should_cache: Predicate function that takes a `httpx.Response` and returns `True` if it should be cached.

        cache_keyfunc: Function that takes request details and returns a unique cache key.

    """
    if client is None:
        client = Client()
    # order matters, retry on inside b/c it is last-chance scenario
    if retry_attempts:
        client = make_retry_client(
            client=client,
            attempts=retry_attempts,
            wait_seconds=retry_wait_seconds,
            should_retry=should_retry,
        )
    # throttling around retries
    if requests_per_minute:
        client = make_throttled_client(
            client=client, requests_per_minute=requests_per_minute
        )
    # caching on top layer, so cache will be checked first
    if cache_storage:
        client = make_dev_caching_client(
            client=client,
            cache_storage=cache_storage,
            cache_keyfunc=cache_keyfunc,
            should_cache=should_cache,
            write_only=cache_write_only,
        )

    return client

Throttling

If requests_per_minute is set, standard (non-retry) requests will automatically sleep for a short period to target the given rate.

For example, at 30rpm, the sleep time on a fast request will be close to 2 seconds.

client = make_careful_client(requests_per_minute=20)

for page in range(10):
  # will sleep ~3 seconds each time
  client.get(f"https://example.com?page={page}")

Retries

If retry_attempts is set, responses will be passed to should_retry. Responses that are rejected (return True) will be retried after a wait based on retry_wait_seconds. Each retry will wait twice as long as the one before.

client = make_careful_client(retry_attempts=2, retry_wait_seconds=30)

# will try, wait 30s, try again, wait 60s, try again, then give up & return the 500
client.get("https://httpbin.org/status/500")

TODO: should_retry

Development Caching

Why development caching?

This feature is named as a reminder that this is not true HTTP caching, which should take various headers into account. Look at libraries like hishel if that's what you are after.

The purpose of this feature is to allow you to cache all of your HTTP requests during development. Often when writing a scraper or crawler, you wind up hitting the site you are working on more often than you'd like-- each time you iterate on your code you're likely making redundant requests to pages that haven't changed.

By caching all successful requests (configurable with the should_cache parameter), you can easily re-run scrapers without making redundant HTTP requests. This means much faster development & happier upstream servers.

To enable development caching, assign a MemoryCache, FileCache, or SqliteCache to the cache_storage property of a scrapelib.Scraper.

client = make_careful_client(
  cache_storage=FileStorage("_cache")
)

# only one HTTP request is made
client.get("https://example.com")
client.get("https://example.com")
client.get("https://example.com")
client.get("https://example.com")
# on subsequent runs, zero will be made until _cache is cleared

Multiple Enhancements

When multiple features are applied, the order of wrapping ensures that:

  • the cache is checked first, and bypasses throttling if hit
  • retries use their own delays, but are not throttled separately

Cache Storage Options

These options are available for cache_storage:

careful.httpx.MemoryCache

Bases: CacheStorage

In memory cache for request responses.

Example:

make_careful_client(
    cache_storage=MemoryCache(),
)
Source code in src/careful/httpx/dev_cache.py
class MemoryCache(CacheStorage):
    """
    In memory cache for request responses.

    Example:

        make_careful_client(
            cache_storage=MemoryCache(),
        )

    """

    def __init__(self) -> None:
        self.cache: dict[str, Response] = {}

    def get(self, key: str) -> None | Response:
        """Get cache entry for key, or return None."""
        return self.cache.get(key, None)

    def set(self, key: str, response: Response) -> None:
        """Set cache entry for key with contents of response."""
        self.cache[key] = response

careful.httpx.FileCache

Bases: CacheStorage

File-based cache for request responses.

Parameters:

Name Type Description Default
cache_dir str

directory for storing responses

required

Example:

make_careful_client(
    cache_storage=FileCache("_httpcache/"),
)
Source code in src/careful/httpx/dev_cache.py
class FileCache(CacheStorage):
    """
    File-based cache for request responses.

    Parameters:
        cache_dir: directory for storing responses

    Example:

        make_careful_client(
            cache_storage=FileCache("_httpcache/"),
        )

    """

    # TODO: restore?
    # check_last_modified:  set to True to compare last-modified
    #                       timestamp in cached response with value from HEAD request

    # file name escaping inspired by httplib2
    _prefix = re.compile(r"^\w+://")
    _illegal = re.compile(r"[?/:|]+")
    _header_re = re.compile(r"([-\w]+): (.*)")
    _maxlen = 200

    def _clean_key(self, key: str) -> str:
        # strip scheme
        md5 = hashlib.md5(key.encode("utf8")).hexdigest()
        key = self._prefix.sub("", key)
        key = self._illegal.sub(",", key)
        return ",".join((key[: self._maxlen], md5))

    def __init__(self, cache_dir: str, check_last_modified: bool = False):
        # normalize path
        self.cache_dir = os.path.join(os.getcwd(), cache_dir)
        self.check_last_modified = check_last_modified
        # create directory
        if not os.path.isdir(self.cache_dir):
            os.makedirs(self.cache_dir)

    def get(self, orig_key: str) -> None | Response:
        """Get cache entry for key, or return None."""
        key = self._clean_key(orig_key)
        path = os.path.join(self.cache_dir, key)
        resp_headers = {}

        try:
            with open(path, "rb") as f:
                # read lines one at a time
                while True:
                    line = f.readline().decode("utf8").strip("\r\n")
                    # set headers

                    # if self.check_last_modified and re.search(
                    #     "last-modified", line, flags=re.I
                    # ):
                    #     # line contains last modified header
                    #     head_resp = requests.head(orig_key)

                    #     try:
                    #         new_lm = head_resp.headers["last-modified"]
                    #         old_lm = line[line.find(":") + 1 :].strip()
                    #         if old_lm != new_lm:
                    #             # last modified timestamps don't match, need to download again
                    #             return None
                    #     except KeyError:
                    #         # no last modified header present, so redownload
                    #         return None

                    header = self._header_re.match(line)
                    if header:
                        resp_headers[header.group(1)] = header.group(2)
                    else:
                        break
                # everything left is the real content
                resp_content = f.read()

            # status & encoding will be in headers, but are faked
            # need to split spaces out of status to get code (e.g. '200 OK')
            resp = Response(
                status_code=int(resp_headers.pop("status").split(" ")[0]),
                content=resp_content,
                default_encoding=resp_headers.pop("encoding"),
                headers=resp_headers,
            )
            return resp
        except IOError:
            return None

    def set(self, key: str, response: Response) -> None:
        """Set cache entry for key with contents of response."""
        key = self._clean_key(key)
        path = os.path.join(self.cache_dir, key)

        with open(path, "wb") as f:
            status_str = "status: {0}\n".format(response.status_code)
            f.write(status_str.encode("utf8"))
            encoding_str = "encoding: {0}\n".format(response.encoding)
            f.write(encoding_str.encode("utf8"))
            for h, v in response.headers.items():
                # header: value\n
                f.write(h.encode("utf8"))
                f.write(b": ")
                f.write(v.encode("utf8"))
                f.write(b"\n")
            # one blank line
            f.write(b"\n")
            f.write(response.content)

    def clear(self) -> None:
        # only delete things that end w/ a md5, less dangerous this way
        cache_glob = "*," + ("[0-9a-f]" * 32)
        for fname in glob.glob(os.path.join(self.cache_dir, cache_glob)):
            os.remove(fname)

careful.httpx.SqliteCache

Bases: CacheStorage

sqlite cache for request responses.

Parameters:

Name Type Description Default
cache_path str

path for SQLite database file

required

Example:

make_careful_client(
    cache_storage=SQLiteCache("_cache.db"),
)
Source code in src/careful/httpx/dev_cache.py
class SqliteCache(CacheStorage):
    """
    sqlite cache for request responses.

    Parameters:
        cache_path: path for SQLite database file

    Example:

        make_careful_client(
            cache_storage=SQLiteCache("_cache.db"),
        )
    """

    _columns = ["key", "status", "modified", "encoding", "data", "headers"]

    def __init__(self, cache_path: str, check_last_modified: bool = False):
        self.cache_path = cache_path
        self.check_last_modified = check_last_modified
        self._conn = sqlite3.connect(cache_path)
        self._conn.text_factory = str
        self._build_table()

    def _build_table(self) -> None:
        """Create table for storing request information and response."""
        self._conn.execute(
            """CREATE TABLE IF NOT EXISTS cache
                (key text UNIQUE, status integer, modified text,
                 encoding text, data blob, headers blob)"""
        )

    def set(self, key: str, response: Response) -> None:
        """Set cache entry for key with contents of response."""
        mod = response.headers.pop("last-modified", None)
        status = int(response.status_code)
        rec = (
            key,
            status,
            mod,
            response.encoding,
            response.content,
            json.dumps(dict(response.headers)),
        )
        with self._conn:
            self._conn.execute("DELETE FROM cache WHERE key=?", (key,))
            self._conn.execute("INSERT INTO cache VALUES (?,?,?,?,?,?)", rec)

    def get(self, key: str) -> None | Response:
        """Get cache entry for key, or return None."""
        query = self._conn.execute("SELECT * FROM cache WHERE key=?", (key,))
        rec = query.fetchone()
        if rec is None:
            return None
        rec = dict(zip(self._columns, rec))

        # TODO evaluate/remove?
        # if self.check_last_modified:
        #     if rec["modified"] is None:
        #         return None  # no last modified header present, so redownload

        #     head_resp = requests.head(key)
        #     new_lm = head_resp.headers.get("last-modified", None)
        #     if rec["modified"] != new_lm:
        #         return None

        resp = Response(
            rec["status"],
            content=rec["data"],
            default_encoding=rec["encoding"],
            headers=json.loads(rec["headers"]),
        )
        return resp

    def clear(self) -> None:
        """Remove all records from cache."""
        with self._conn:
            self._conn.execute("DELETE FROM cache")

    def __del__(self) -> None:
        self._conn.close()