Source code for pypi_simple.html_stream

from __future__ import annotations
from codecs import getincrementaldecoder
from collections.abc import Iterable, Iterator
from html.parser import HTMLParser
from itertools import chain
from typing import AnyStr, Optional, cast
from urllib.parse import urljoin
from bs4.dammit import EncodingDetector
import requests
from .html import Link
from .util import check_repo_version

# List taken from BeautifulSoup4 source
EMPTY_TAGS = {
    "area",
    "base",
    "br",
    "col",
    "embed",
    "hr",
    "img",
    "input",
    "keygen",
    "link",
    "menuitem",
    "meta",
    "param",
    "source",
    "track",
    "wbr",
    "basefont",
    "bgsound",
    "command",
    "frame",
    "image",
    "isindex",
    "nextid",
    "spacer",
}


class LinkParser(HTMLParser):
    def __init__(self, base_url: Optional[str] = None) -> None:
        super().__init__(convert_charrefs=True)
        self.base_url: Optional[str] = base_url
        self.base_seen = False
        self.tag_stack: list[str] = []
        self.finished_links: list[Link] = []
        self.link_tag_stack: list[dict[str, str]] = []

    def fetch_links(self) -> list[Link]:
        links = self.finished_links
        self.finished_links = []
        return links

    def handle_starttag(self, tag: str, attrs: list[tuple[str, Optional[str]]]) -> None:
        if tag not in EMPTY_TAGS:
            self.tag_stack.append(tag)
        attrdict = {k: v or "" for k, v in attrs}
        if tag == "base" and "href" in attrdict and not self.base_seen:
            if self.base_url is None:
                self.base_url = attrdict["href"]
            else:
                self.base_url = urljoin(self.base_url, attrdict["href"])
            self.base_seen = True
        elif tag == "a":
            attrdict["#text"] = ""
            self.link_tag_stack.append(attrdict)
        elif (
            tag == "meta"
            and attrdict.get("name") == "pypi:repository-version"
            and "content" in attrdict
        ):
            check_repo_version(attrdict["content"])

    def handle_endtag(self, tag: str) -> None:
        for i in range(len(self.tag_stack) - 1, -1, -1):
            if self.tag_stack[i] == tag:
                for t in self.tag_stack[i:]:
                    if t == "a":
                        self.end_link_tag()
                del self.tag_stack[i:]
                break

    def end_link_tag(self) -> None:
        attrs = self.link_tag_stack.pop()
        if "href" in attrs:
            text = attrs.pop("#text")
            if self.base_url is not None:
                url = urljoin(self.base_url, attrs["href"])
            else:
                url = attrs["href"]
            self.finished_links.append(
                Link(
                    text=text.strip(),
                    url=url,
                    attrs=cast("dict[str, str | list[str]]", attrs),
                )
            )

    def handle_data(self, data: str) -> None:
        for link in self.link_tag_stack:
            link["#text"] += data

    def close(self) -> None:
        while self.link_tag_stack:
            self.handle_endtag("a")
        super().close()










def iterhtmldecode(
    iterable: Iterable[AnyStr],
    http_charset: Optional[str] = None,
    default_encoding: str = "cp1252",
    errors: str = "replace",
    scan_window: int = 1024,
) -> Iterator[str]:
    """
    Given an HTML document in the form of an iterable of `bytes`, try to
    determine the document's encoding while consuming as little of the iterable
    as necessary, and then decode the elements of the iterable one at a time.

    As a convenience, if an iterable of `str` objects is passed, the elements
    of the iterable are yielded unmodified.

    This function follows a vastly simplified form of the WHATWG's
    `"Determining the Character Encoding" specification <encspec_>`_.  In
    particular, it determines the character encoding by consulting the
    following sources, in order, and using the first one found:

    - byte-order mark
    - HTTP charset
    - encoding declared in document
    - default encoding

    .. _encspec:
       https://html.spec.whatwg.org/multipage/parsing.html
       #determining-the-character-encoding

    :param Iterable[AnyStr] iterable: an iterable of either `bytes` or `str`
        that, when joined together, form an HTML document
    :param Optional[str] http_charset: the document's encoding as declared by
        the transport layer, if any; e.g., as declared in the ``charset``
        parameter of the :mailheader:`Content-Type` header of the HTTP response
        that returned the document
    :param str default_encoding: the default encoding to fall back to if none
        of the other sources succeed in determining the encoding; defaults to
        CP1252
    :param str errors: the error handler to use when decoding the document;
        defaults to ``"replace"``
    :param int scan_window: how many bytes to consume from the iterable when
        checking for an encoding declaration
    :rtype: Iterator[str]
    """

    # We can't use UnicodeDammit directly for this because (a) it gives the
    # encoding extracted from the `Content-Type` header precedence over the
    # encoding indicated by the BOM, when it should be the other way around,
    # and (b) it assumes it's got the whole document to work with and will fail
    # if the given blob ends in the middle of a multibyte character encoding.

    iterator = iter(iterable)
    try:
        initblob = next(iterator)
    except StopIteration:
        return iter(cast("list[str]", []))
    if isinstance(initblob, str):
        return chain([initblob], iterator)
    while len(initblob) < scan_window:
        try:
            initblob += next(iterator)
        except StopIteration:
            break
    enc: Optional[str]
    initblob, enc = EncodingDetector.strip_byte_order_mark(initblob)
    if enc is None:
        if http_charset is not None:
            enc = http_charset
        else:
            enc = EncodingDetector.find_declared_encoding(
                initblob,
                is_html=True,
                search_entire_document=True,
            )
            if enc is None:
                enc = default_encoding
    assert isinstance(enc, str)
    return iterdecode(chain([initblob], iterator), enc, errors=errors)


def iterdecode(
    iterable: Iterable[bytes],
    encoding: str,
    errors: str = "strict",
) -> Iterator[str]:
    """
    Decode an iterable of `bytes` that together form a single document one
    element at a time.

    :param Iterable[bytes] iterable: the bytes to decode
    :param str encoding: the encoding to decode from
    :param str errors: the error handler to use
    :rtype: Iterator[str]
    """
    decoder = getincrementaldecoder(encoding)(errors=errors)
    for blob in iterable:
        yield decoder.decode(blob)
    yield decoder.decode(b"", True)