LINUX.ORG.RU

Asyncio: ограничить количество запросов за единицу времени

 ,


0

1
import asyncio
import cgi
import logging
import os
from asyncio import Semaphore
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional, Set
from urllib.parse import urldefrag, urljoin, urlsplit, urlunsplit

from aiohttp import ClientSession, ClientTimeout
from bs4 import BeautifulSoup

__all__ = ('Crawler', 'crawl')

_log = logging.getLogger(__name__.split('.')[0])


class Crawler:
  RATE_LIMIT = 50
  MAX_WORKERS = os.cpu_count()
  ALLOWED_SCHEMES = ('http', 'https')

  def __init__(
    self,
    loop = None,  # Не знаю какой тип указать,
    *,
    session: Optional[ClientSession] = None,
    semaphore: Optional[Semaphore] = None,
    executor: Optional[ThreadPoolExecutor] = None
  ):
    self.loop = loop or asyncio.get_event_loop()
    # DeprecationWarning: The object should be created from async function
    self.session = session or ClientSession(loop=self.loop)
    self.semaphore = semaphore or Semaphore(self.RATE_LIMIT, loop=self.loop)
    self.executor = executor or ThreadPoolExecutor(max_workers=self.MAX_WORKERS)

  async def crawl(
    self,
    url: str,
    cb: Callable,
    *,
    depth: Optional[int] = 3,
    headers: Optional[dict] = {},
    timeout: Optional[float] = 15.0,
  ):
    timeout = ClientTimeout(total=timeout)
    url = self._normalize_url(url)
    await self._fetch(url, cb, depth, headers, timeout, set())

  async def _fetch(
    self,
    url: str,
    cb: Callable,
    depth: int,
    headers: dict,
    timeout: ClientTimeout,
    seen: Set[str],
  ):
    if url not in seen:
      async with self.semaphore:
        try:
          _log.debug('url=%s depth=%s', url, depth)
          async with self.session.get(
            url,
            headers=headers,
            timeout=timeout,
            # allow_redirects=False,
            verify_ssl=False,
          ) as r:
            # Нужно ли нормализовать url или и так схема и netloc в нижнем регистре?
            url = self._normalize_url(str(r.url))
            seen.add(url)
            res = cb(url)
            if asyncio.iscoroutine(res):
              await res

            # Тут может упасть, если у в заголовках ответа Content-Type не будет
            ct, _ = cgi.parse_header(r.headers['Content-Type'])
            if ct == 'text/html' and depth > 1:
              html = await r.text()
              links = await self.loop.run_in_executor(
                self.executor,
                self._extract_links,
                html,
                url
              )
              # _log.debug(links)
              tasks = (
                self._fetch(link, cb, depth - 1, headers, timeout, seen)
                for link in links
              )
              await asyncio.gather(*tasks)
        except Exception as e:
          _log.warn(e)

  def _extract_links(self, html: str, base_url: str) -> set:
    soup = BeautifulSoup(html, 'lxml')
    base = soup.find('base', href=True)
    if base:
      # Вроде как относительным может быть
      base_url = urljoin(base_url, base['href'])

    rv = set()
    for link in soup.find_all('a', href=True):
      url = urljoin(base_url, link['href'])
      if urlsplit(url).scheme in self.ALLOWED_SCHEMES:
        rv.add(self._normalize_url(url))

    return rv

  def _normalize_url(self, url: str) -> str:
    scheme, netloc, path, query, _ = urlsplit(url)
    return urlunsplit((scheme, netloc.lower(), path, query, ''))


async def crawl(*args, **kwargs):
  await Crawler().crawl(*args, **kwargs)


if __name__ == '__main__':
  from argparse import ArgumentParser
  logging.basicConfig(level=logging.DEBUG)
  parser = ArgumentParser()
  parser.add_argument('url', help='URL')
  parser.add_argument('-d', '--depth', default=3, type=int, help='Depth')
  parser.add_argument('-H', '--header', default=[], action='append', help='Header: Value')
  parser.add_argument('-t', '--timeout', default=15.0, type=float, help='Timeout')
  args = parser.parse_args()
  _log.debug(args.header)
  headers = dict(map(str.strip, v.split(':')) for v in args.header)
  asyncio.get_event_loop().run_until_complete(crawl(
    args.url,
    print,
    depth=args.depth,
    headers=headers,
  ))

Запускаю скрипт:

python crawler.py http://habr.com/ -H "User-Agent: Mozilla/5.0"

До второго уровня доходит и повисает. Что делать в душе не е*у. Есть ли те кто глубоко разбирается в asyncio?

тут очень важно ограничить количество запросов, потому как у линукса органие в 1024 открытых файла (в число файлов входят и сокеты). Эту настройку можно изменить, но я все равно хочу чтобы не больше 50 запросов обрабатывались параллельно.

tz4678 ()

По-мелочи могу только сказать

    depth: Optional[int] = 3,
    timeout: Optional[float] = 15.0,
Optional тут лишний

    headers: Optional[dict] = {},

Мутабельные объекты не стоит делать значениями по умолчанию. Надо так:

    headers: Optional[dict] = None,
...
if headers is None:
    headers = {}

anonymous ()
Ответ на: комментарий от vvn_black
import asyncio
import cgi
import logging
import os
from asyncio import Semaphore
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional, Set
from urllib.parse import urldefrag, urljoin, urlsplit, urlunsplit

from aiohttp import ClientSession, ClientTimeout
from bs4 import BeautifulSoup

__all__ = ('Crawler', 'crawl')

_log = logging.getLogger(__name__.split('.')[0])


class Crawler:
  RATE_LIMIT = 50
  MAX_WORKERS = os.cpu_count()
  ALLOWED_SCHEMES = ('http', 'https')

  def __init__(
    self,
    loop = None,  # Не знаю какой тип указать,
    *,
    session: Optional[ClientSession] = None,
    semaphore: Optional[Semaphore] = None,
    executor: Optional[ThreadPoolExecutor] = None,
  ):
    self.loop = loop or asyncio.get_event_loop()
    # DeprecationWarning: The object should be created from async function
    self.session = session or ClientSession(loop=self.loop)
    self.semaphore = semaphore or Semaphore(self.MAX_WORKERS, loop=self.loop)
    self.executor = executor or ThreadPoolExecutor(max_workers=self.MAX_WORKERS)

  async def crawl(
    self,
    url: str,
    cb: Callable,
    *,
    depth: Optional[int] = 3,
    headers: Optional[dict] = None,
    timeout: Optional[float] = 15.0,
    return_body: Optional[bool] = True,
  ):
    url = self._normalize_url(url)
    timeout = ClientTimeout(total=timeout)
    # Асинхронные очереди тут не подходят
    queue = [(url, depth)]
    visited = set()
    domain = urlsplit(url).netloc
    while queue:
      try:
        async with self.semaphore:
          url, depth = queue.pop(0)
          _log.debug('url=%s depth=%s', url, depth)
          async with self.session.get(
              url,
              headers=headers,
              timeout=timeout,
              allow_redirects=False,
              verify_ssl=False,
            ) as r:
              visited.add(url)
              assert 300 > r.status >= 200, 'Bad Status'
              body = None
              if return_body:
                try:
                  body = await r.text()
                except:
                  body = await r.read()

              res = cb(url, r.headers, body)
              if asyncio.iscoroutine(res):
                await res

              ct, _ = cgi.parse_header(r.headers['Content-Type'])
              if ct == 'text/html' and depth > 0:
                html = await r.text()
                urls = await self.loop.run_in_executor(
                  self.executor,
                  self._extract_links,
                  html,
                  url
                )
                # Исключаем посещенные
                urls -= visited
                # _log.debug(urls)
                for u in urls:
                  # Обрабатываем только url текущего сайта
                  if urlsplit(u).netloc == domain:
                    queue.append((u, depth - 1))
      except Exception as e:
        _log.warning(str(e))

  def _extract_links(self, html: str, base_url: str) -> set:
    soup = BeautifulSoup(html, 'lxml')
    base = soup.find('base', href=True)
    if base:
      # Вроде как относительным может быть
      base_url = urljoin(base_url, base['href'])

    rv = set()
    for link in soup.find_all('a', href=True):
      url = urljoin(base_url, link['href'])
      if urlsplit(url).scheme in self.ALLOWED_SCHEMES:
        rv.add(self._normalize_url(url))

    return rv

  def _normalize_url(self, url: str) -> str:
    scheme, netloc, path, query, _ = urlsplit(url)
    return urlunsplit((scheme, netloc.lower(), path, query, ''))


async def crawl(*args, **kwargs):
  await Crawler().crawl(*args, **kwargs)


if __name__ == '__main__':
  from argparse import ArgumentParser
  logging.basicConfig(level=logging.DEBUG)
  parser = ArgumentParser()
  parser.add_argument('url', help='URL')
  parser.add_argument('-d', '--depth', default=3, type=int, help='Depth')
  parser.add_argument('-H', '--header', default=[], action='append', help='Header: Value')
  parser.add_argument('-t', '--timeout', default=15.0, type=float, help='Timeout')
  args = parser.parse_args()
  _log.debug(args)
  headers = dict(map(str.strip, v.split(':')) for v in args.header)
  def handler(url, headers, body):
    _log.info('%s %s %s', url, headers, body[:500])
  asyncio.get_event_loop().run_until_complete(crawl(
    args.url,
    handler,
    depth=args.depth,
    headers=headers,
  ))

переписал. протестировал на этом сайте пока он не стал разрывать соединения. вроде работает.

tz4678 ()