LINUX.ORG.RU

6
Всего сообщений: 31

asyncинкование в python

в перемешку с asyncом в коде встречаются такие конструкции:

res = [bla_bla_func(key) for key in [1,2,3,4]]

Вопрос след: имеет ли смысл все подобные конструкции менять на

res = await asyncio.gather(map(bla_bla_func, [1,2,3,4]))

?

 

Jopich1 ()

критик реквест

Разродился вот этим:

async def wait_event(add_callback, del_callback):                                                    
     ev = asyncio.Event()                                                                             
     def callback(*args, **kwargs):                                                                   
         ev.set()                                                                                     
     add_callback(callback)                                                                           
     await ev.wait()                                                                                  
     del_callback(callback)

...

await wait_event(my_class.add_some_event_listener, my_class.del_some_event_listener)

my_class - библиотечный класс из того же проекта. Переводить some_event на async не хочется, т.к. в некоторых юз кейсах важен порядок обработки ивентов (а в некоторых нет, а в других достаточно дождаться однократного ивента, как в примере выше).

Но детектор говнокода попискивает. Расскажите, как надо. Стоит ли в API класса на каждый event_listener добавить свой async def wait_event()?

Синтаксис async/await использую ~третий раз в жизни

 , , , ,

MyTrooName ()

Sanic долго разрывает подключение

Сейчас балуюсь с асинхронным Sanic и возникла такая проблема - Если к нему делать запросы с keep-alive, то подключения долго закрываются (около 10-15 секунд) и в это время сервер просто не отвечает ни на один запрос. Причем, даже если в самом Sanic выставить KEEP_ALIVE=False. Если же делать запросы без keep-alive -то все нормально и быстро. Может кто с подобным сталкивался? Это баг или я что-то делаю не так?

P.S Да, я знаю, что перед ним можно поставить проксирующий Nginx. Вопрос не для того, чтобы получить такой ответ.

 , , ,

crarkie ()

размер ThreadPoolExecutor

Вот у меня есть асихнхронное приложение на python. Я запускаю N его копий допустим через gunicorn (или uvicorn). N определяется по формуле 1 + количество_логических_ядер * 2. ThreadPoolExecutor нужен для выполнения ресурсоемких операций, которые «вешают основной поток». Как подобрать его оптимальный размер?

 ,

tz4678 ()

Asyncio: ограничить количество запросов за единицу времени

import asyncio
import cgi
import logging
import os
from asyncio import Semaphore
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Optional, Set
from urllib.parse import urldefrag, urljoin, urlsplit, urlunsplit

from aiohttp import ClientSession, ClientTimeout
from bs4 import BeautifulSoup

__all__ = ('Crawler', 'crawl')

_log = logging.getLogger(__name__.split('.')[0])


class Crawler:
  RATE_LIMIT = 50
  MAX_WORKERS = os.cpu_count()
  ALLOWED_SCHEMES = ('http', 'https')

  def __init__(
    self,
    loop = None,  # Не знаю какой тип указать,
    *,
    session: Optional[ClientSession] = None,
    semaphore: Optional[Semaphore] = None,
    executor: Optional[ThreadPoolExecutor] = None
  ):
    self.loop = loop or asyncio.get_event_loop()
    # DeprecationWarning: The object should be created from async function
    self.session = session or ClientSession(loop=self.loop)
    self.semaphore = semaphore or Semaphore(self.RATE_LIMIT, loop=self.loop)
    self.executor = executor or ThreadPoolExecutor(max_workers=self.MAX_WORKERS)

  async def crawl(
    self,
    url: str,
    cb: Callable,
    *,
    depth: Optional[int] = 3,
    headers: Optional[dict] = {},
    timeout: Optional[float] = 15.0,
  ):
    timeout = ClientTimeout(total=timeout)
    url = self._normalize_url(url)
    await self._fetch(url, cb, depth, headers, timeout, set())

  async def _fetch(
    self,
    url: str,
    cb: Callable,
    depth: int,
    headers: dict,
    timeout: ClientTimeout,
    seen: Set[str],
  ):
    if url not in seen:
      async with self.semaphore:
        try:
          _log.debug('url=%s depth=%s', url, depth)
          async with self.session.get(
            url,
            headers=headers,
            timeout=timeout,
            # allow_redirects=False,
            verify_ssl=False,
          ) as r:
            # Нужно ли нормализовать url или и так схема и netloc в нижнем регистре?
            url = self._normalize_url(str(r.url))
            seen.add(url)
            res = cb(url)
            if asyncio.iscoroutine(res):
              await res

            # Тут может упасть, если у в заголовках ответа Content-Type не будет
            ct, _ = cgi.parse_header(r.headers['Content-Type'])
            if ct == 'text/html' and depth > 1:
              html = await r.text()
              links = await self.loop.run_in_executor(
                self.executor,
                self._extract_links,
                html,
                url
              )
              # _log.debug(links)
              tasks = (
                self._fetch(link, cb, depth - 1, headers, timeout, seen)
                for link in links
              )
              await asyncio.gather(*tasks)
        except Exception as e:
          _log.warn(e)

  def _extract_links(self, html: str, base_url: str) -> set:
    soup = BeautifulSoup(html, 'lxml')
    base = soup.find('base', href=True)
    if base:
      # Вроде как относительным может быть
      base_url = urljoin(base_url, base['href'])

    rv = set()
    for link in soup.find_all('a', href=True):
      url = urljoin(base_url, link['href'])
      if urlsplit(url).scheme in self.ALLOWED_SCHEMES:
        rv.add(self._normalize_url(url))

    return rv

  def _normalize_url(self, url: str) -> str:
    scheme, netloc, path, query, _ = urlsplit(url)
    return urlunsplit((scheme, netloc.lower(), path, query, ''))


async def crawl(*args, **kwargs):
  await Crawler().crawl(*args, **kwargs)


if __name__ == '__main__':
  from argparse import ArgumentParser
  logging.basicConfig(level=logging.DEBUG)
  parser = ArgumentParser()
  parser.add_argument('url', help='URL')
  parser.add_argument('-d', '--depth', default=3, type=int, help='Depth')
  parser.add_argument('-H', '--header', default=[], action='append', help='Header: Value')
  parser.add_argument('-t', '--timeout', default=15.0, type=float, help='Timeout')
  args = parser.parse_args()
  _log.debug(args.header)
  headers = dict(map(str.strip, v.split(':')) for v in args.header)
  asyncio.get_event_loop().run_until_complete(crawl(
    args.url,
    print,
    depth=args.depth,
    headers=headers,
  ))

Запускаю скрипт:

python crawler.py http://habr.com/ -H "User-Agent: Mozilla/5.0"

До второго уровня доходит и повисает. Что делать в душе не е*у. Есть ли те кто глубоко разбирается в asyncio?

 ,

tz4678 ()

Asyncio обработка задач различного типа корутинами.

Добрый день. Возникла необходимость сделать следующее.

На старте мы запускаем, допустим, с десяток корутин. Их задача - брать из очереди задачи и выполнять их. Если же задачи нету, то тогда необходимо просто спать и отдавать управление в event loop. Но к самим задачам, которые мы добавляем, должен так же прибавлятся дополнительный 'тег'. Грубо говоря, метка. При запросе задачи из очереди, мы можем в параметрах передать тег или список тегов, по которым задачи возвращаться не должны. Как такое реализовать? Писать свою очередь? Как вариант - брать задачу из очереди, и если она не нужного нам типа, то тогда просто возвращать ее обратно в очередь. Но мне кажется, что это просто хак и будет работать он очень медленно, т.к перебирать задачи мы можем долго таким образом + гонять их туда-сюда. Второй вариант, приходящий в голову - использовать какие-либо примитивы синхронизации и создавать на каждый тип задачи отдельную очередь, уведомляя о ней корутины. Если сформулировать задачу более формально, то необходимо следующее - мы создаем кучу одинаковых задач, их может быть =< количество корутин. Все должны быть выполнены, но каждая корутина не должна взять больше одной задачи. Подвох в том, что в процессе получения задач из очереди, мы можем добавить еще туда одинаковых задач, но уже «другого» типа.

 , ,

crarkie ()

Called while another coroutine is already waiting for incoming data

Приветствую!

Пишу на питоне интеграцию одного девайса с сервером умного дома home-assistant. Там повсюду asyncio и вечно сталкиваюсь с гонкой доступа к TCP-соединению.

Обмен данными с девайсом происходит в виде текстового чатика по одному TCP-соединению, а стиль програмирования под homeassistant заставляет вкидывать в eventloop таски, не дожидаясь завершения их выполнения..

Условно говоря, есть метод

async def send_command(self, cmd):
    self._cmd_wrter.write(cmd + b'\r\n')
    await self._cmd_wrter.drain()
    buf: bytes = await self._cmd_reader.readline()
    ...

Но, из-за специфики homeassistant, работать с этим методом приходится без await. Грубо говоря:

def callback():
    asyncio.ensure_future(device.send_command("SOME COMMAND"))

callbacks = [callback, callback]

for callback in callbacks:
    callback()

Вылетает ошибка readuntil() called while another coroutine is already waiting for incoming data. Я так понимаю, из-за попытки одновременного вызова readline() из нескольких корутин

Подскажите, что тут можно поделать? Может как-то переписать send_command, чтобы она обрабатывала все запросы последовательно? Хотя вроде перед readline() и так стоит await.. Не могу сообразить.

 , , ,

spoonbob ()

Асинхронный ввод/вывод средствами операционной системы

Всем здравствуйте.

Посмотрел я на API богомерзкой .NET – и там в классе System.IO.File есть семейство методов Read...Async, возвращающих task<T> либо от массива байт (byte[]), либо от строки (string). Заводить фоновый поток исполнения (хоть managed, хоть native – неважно) не требуется. На оффтопике всё это работает через т. наз. Overlapped_I/O, и я предполагаю, что в рамках проектов .NET Core и Mono то же самое API правильно портировано на Linux и BSD.

А потом я посмотрел на реализацию java.nio.channels.AsynchronousFileChannel (и наследников) в стандартной библиотеке Java – и там, наоборот, при создании канала ввода/вывода нужно либо предоставить свой фоновый пул потоков (внутри ExecutorService), либо использовать пул по умолчанию.

И то, и другое означает, что, если к вашему сервису придёт миллион китайцев, то либо китайцы будут ждать, либо сервис ляжет с OOME, не справившись с созданием стотысячного потока.

И вот тут напрашивается вопрос – а почему? Почему в Java нельзя было сделать так же, как в .NET? Ведь есть:

  • kernel-accelerated AIO, O_DIRECT и epoll в Linux;
  • I/O Completion Ports в AIX и Solaris;
  • kqueue() и kevent() в BSD и Mac OS X;
  • наконец, POSIX AIO, стандартное для всех ОС.

 ,

Bass ()

Помогите разобраться в Asyncio.

Добрый день, решил попробовать поюзать aiohttp для вэбсокетов. Нашел примеры реализации простого сервера, с этим все более менее понятно. Но есть потребность работы в связке с RabbitMQ. Нашел aiopika. Запуск тестовых примеров, подключение, примем, отправка сообщений - все работает. Но как теперь подружить их вместе?

Как я вижу подобную реализацию: 1. До начала работы сервера нужно создать подключение к RabbitMQ, чтобы потом этот коннектор можно было передавать в другие таски. 2. Пример работы с вэбсокетами: https://github.com/samael500/aiochat/blob/master/aiochat/chat/views.py Класс (WebSocket) Тут в методе Get обрабатывается новое подключение клиента, один экземпляр этого класса - один клиент. 3. В этом же методе Get мне надо создать очередь и подписаться на нее (для каждого клиента создается отдельная очередь). 4. Слушать сообщения из очереди в этом методе не получится, т.к он слушает входящие данные по вэб сокетам, поэтому создаем таск, в котором будем слушать сообщения из очереди.

    #########
    if self.room.id not in app.wslist:
        app.wslist[self.room.id] = {}

    # генерируем имя очереди
    queue_name = uuid.uuid()
    asyncio.create_task(create_queue(queue_name))

    message = await app.objects.create(
        Message, room=self.room, user=None, text=f'@{user.username} join chat room')
    #######

И сам таск create_queue:

async create_queue(queue_name):
# Creating channel
    channel = await connection.channel()    # type: aio_pika.Channel

    # Declaring queue
    queue = await channel.declare_queue(
        queue_name,
        auto_delete=True
    )   # type: aio_pika.Queue

    async for message in queue:
        with message.process():
            print(message.body)

И столкнулся со следующими вопросами: 1. По идее нам нужен один коннект к RabbitMQ на весь сервер, поэтому до запуска сервера мы должны подключиться к RabbitMQ и потом этот коннектор передавать в таск create_queue. Верно? Но сам метод библиотеки aiopika должен вызываться в сопрограмме, например метод из либы aiohttp для получения сессии можно вызвать с помощью async with, тогда мы получаем коннектор, который спокойно можем передавать в другие сопрограммы, но вот как быть с aiopika? 2. Передача сообщения полученного в RabbitMQ в объект вэб сокета (класс WebSocket). В методе get мы просто создали и запустили таск, который будет слушать сообщения, но как он может оповещать клиента о получении сообщения? Коллбек? Но в asyncio подходе на сколько я понял коллбки это моветон.

С asyncio начал разбираться совсем не давно, учебные примеры, который встречаются в инете направленны, по сути, только на асинхронное получение данных с разных сайтов, в общем, рассматривают только одинаковые задачи, запущенные в разных сопрограммах.

 ,

hell_wood ()

В чем польза asyncio?

Допустим у меня стоит задача опроса состояния GPIO-пина и выполнения некоторых действий в связи с этим.

Я могу поступить по-простому:

завернуть пару-тройку for-while циклов, сделать в них poll состояния и логику того, что делать если состояние изменилось.

А могу использовать asyncio. И я пока не понимаю, чем он отличается от такой же for-loops по большому счету, кроме того что представляет некоторую абстракцию над ними.

Ну то есть читаемость - да, улучшается.

А в плане производительности например, это будет бонус или наоборот? Если будет, то от чего?

 ,

alpha ()

посоветуйте библиотеку для python asyncio для постгреса

$сабж (не для веба)

типа такой вот: https://github.com/fantix/gino

 , , ,

MyTrooName ()

Как прекратить job (полностью вывалиться из него)?

Доброго вечера.

Достаточно глупый вопрос, даже стыдно.

Есть некая очередь с консьюмерами. В процессе обработки таска проводится проверка на соответствие некоторым условиям. В случае соответствия одному из них - дальнейшую обработку таска прерываем.

class Example:
    def __init__(self):
        pass

    async def consumer(self, queue):
        while True:
            i = await queue.get()
            k_list = [1, 2, 3]
            for k in k_list:
                if i == k:
                    queue.task_done()
                    # Как прекратить дальнейшую обработку этого таска?
                    
            await asyncio.sleep(3)  # Псевдонагрузка
            queue.task_done()

    async def producer(self, queue):
        for i in range(0, 100):
            await queue.put(i)

    async def run(self):
        queue = asyncio.Queue()
        consumers = [asyncio.ensure_future(self.consumer(queue)) for _ in range(10)]
        producer = await self.producer(queue)
        await queue.join()

        for consumer_future in consumers:
            consumer_future.cancel()

    def start(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.run())

if __name__ == '__main__':
    Example = Example()
    Example.start()

Сложность в блоке for. Как в окружении asyncio правильнее прервать дальнейшую обработку? break\continue не подходят, т.к. из while'а не выходим, return же при такой реализации подвесит весь поток.

Где я не там свернул?

 ,

Deleted ()

aiomysql: выдает ошибку при попытке импорта

Друзья, простите заранее за нубский вопрос, совсем еще новичок в Python. Пытаюсь написать фреймворк с использованием aiohttp и в качестве библиотеки для асинхронной работы с БД пытаюсь использовать aiomysql. aiomysql установил, но при попытке импорта библиотеки, вылезает вот такая ошибка (pymysql тоже установлено):

Traceback (most recent call last): File «<stdin>», line 1, in <module> File «C:\Users\AAA\AppData\Local\Programs\Python\Python36-32\lib\site-packages\aiomysql\__init__.py», line 32, in <module> from .connection import Connection, connect File «C:\Users\AAA\AppData\Local\Programs\Python\Python36-32\lib\site-packages\aiomysql\connection.py», line 30, in <module> from pymysql.connections import _scramble ImportError: cannot import name '_scramble'

Помогите плиз разобраться с чем связана эта ошибка и как исправить?

И второй вопрос - правильно ли я делаю, что использую aiomysql или есть другие библиотеки для асинхронной работы с Mysql?

Заранее огромное спасибо!

 , , ,

Luhashan ()

Python Developer, Video Processing and Analytics

Я ищу Python разработчика в новый проект Argus компании Constanta, система аналитики спортивных трансляций. Сейчас работают над трекингом объектов, с возможностью подсчета очков и иных событий в различных видах спорта.

Вот тут можно поглядеть на работу системы:

Первоочередная задача состоит в подготовке входных данных (как правило - картинок из видео) для нейронных сетей и постпроцессинге результатов. При этом важно обеспечить быструю работу в реальном времени (в зависимости от вида спорта от 30 до 120 FPS). Кроме этого есть задачи, связанные с редактированием live video, также в планах разработка системы разметки данных на видео для обучения нейросетей.

Требования: Python 3.x, asyncio, опыт оптимизации многопоточного кода.Плюсом будет опыт в обработке изображений.

ЗП от 140-200к на руки, гибкий график, офис м. Бауманская, молодая и сильная команда (ex Tinkoff, Ostrovok etc.), обеды, настольный теннис, диван.

По всем вопросам babavilly@gmail.com

 ,

babavilly ()

Python Aiohttp server: Как сделать response не из handler?

Всем привет! Копаюсь в aiohttp. Возник сабж знатокам aiohttp. Тривиальный код сервера с обработчиком (handler) запроса:

from aiohttp import web

async def hello(request): return web.Response(text=«Hello, world»)

app = web.Application() app.router.add_get('/', hello)

web.run_app(app)

В доках сказано, что обработчик всегда должен возвращать объект Response. Можно как-то ответить на запрос не из обработчика (в данном случае hello(request)), а например в другой корутине, передав например в глобальный массив контекст запроса? Например, в node.js каждый запрос сопровождается двумя объектами, привязанными к контексту – request и response. Соответственно c response можно делать все что угодно и в любом месте сделать response.end(‘hello’). Сорри если туплю, новичок пока еще.

 , , ,

Luhashan ()

Cloud developer (Python), Москва, Ivideon

Ivideon — это самый популярный в мире облачный сервис видеонаблюдения через Интернет. Мы помогаем анализировать бизнес: понимать как часто возникают очереди, сколько людей заходит в магазин, а сколько проходит мимо, какой средний чек, что люди покупают.

Наш фокус - разработка собственных продуктов, которые выпускаются для российского, американского и европейского рынков на 10 языках. За 8 лет работы мы стали облачным сервисом номер один в мире, запустив совместные международные проекты с Philips, Honeywell, Samsung, SoftBank, Hikvision, Dahua и многими другими крупными корпорациями, использующими наше облако как часть своих продуктов. Так что если загуглить «Cloud Video Surveillance» мы будем на почетном первом месте :)

Наша облачная платформа обрабатывает, хранит и стримит видео с сотен тысяч камер по всему миру. У нас собственные подсистемы кодирования видео, распределённое хранилище архива c петабайтами данных, видеоаналитика и ещё десятки микросервисов для разных задач. Всё это работает на стеке Python 3 (Tornado и Asyncio), C++/Asio, MongoDB, Docker, Consul.

Вилка: 150-230 на руки + соцпакет.

Сейчас мы ищем сильных разработчиков, которые:

—Вместе с нами будут разрабатывать бекенд Ivideon;

—Проектировать архитектуру новых сервисов;

—Автотестить написанное - сейчас код покрыт 800 функциональными тестами;

—Разрабатывать API приложений и интеграции.

Что мы ждём от кандидатов: Опыт командной разработки. Желательно Python (писать нужно будет в основном на нём), но бэкграунд в Go/C++/Java тоже может пригодиться. Смелость и навыки проектирования нагруженных сервисов. Умение грамотно выражать свои идеи в коде и документации. Котируем матан и хорошее знание сетей и протоколов.

Собеседуем быстро - знакомимся по скайпу и приглашаем в гости, суммарно пара часов, решение принимаем за один день. Сконтачиться можно по почте - d.chinarev@ivideon.com или в телеге @dchinarev.

 , , , ,

dchinarev ()

python. asincio. Возврат значений из асинхронного кода

Пишу ходилку телнетом по железякам. Ходить она будет параллельно. Количество железяк 50+ .

Делаю на python (либа telnetlib3 на asincio )

Вот пример клиента telnet из документации:

import asyncio, telnetlib3

@asyncio.coroutine
def shell(reader, writer):
    while True:
        # read stream until '?' mark is found
        outp = yield from reader.read(1024)
        if not outp:
            # End of File
            break
        elif '?' in outp:
            # reply all questions with 'y'.
            writer.write('y')

        # display all server output
        print(outp, flush=True)

    # EOF
    print()

loop = asyncio.get_event_loop()
coro = telnetlib3.open_connection('localhost', 6023, shell=shell)
reader, writer = loop.run_until_complete(coro)
loop.run_until_complete(writer.protocol.waiter_closed)

Подскажите, пожалуйста, как правильно вернуть результат работы асинхронной функции shell в синхронный код в конце после run_until_complete.

Через rerutn не подходит, т.к. open_connection возвращает reader, writer

 ,

psm666 ()

Как отправить питон-приложению сигнал завершения работы?

Всем привет! Есть у меня приложение, представляющее из себя веб-сокет-сервер (asyncio, aiohttp), запускается под гуникорном, а гуникорн в свою очередь запускается с помощью systemctl. Как мне грамотно остановить таски в приложении, если приложение не связано с консолью, и ctrl+c использовать не могу? При systemctl gunicorn stop пробрасывается сигнал завершения в приложение? Если да, то какой?

 , ,

a-lexx ()

Работа с сокетами через веб

Пилю веб-управлялку некими железками, взаимодействие с которыми происходит по сокетам (в большинстве случаев через конвертеры rs232 <=> tcp socket).

До последнего времени задача была одна - послать команду, а там трава не расти. С этим успешно справлялось простое приложение на Flask с post-запросами через jquery и socket.send().

Но настала пора слушать что железки вещают и сигнализировать об определенных событиях в веб-морду.

1. Есть ли актуальные best practice как это все делается? Ни с async, ни c обычными потоками я никогда не работал, поэтому не знаю с чего начать.

2. Если брать asyncio, то стоит ли переходить на aiohttp, или из Flask тоже можно работать?

 , , , ,

Turbid ()

Нубский вопрос по python/asyncio/aiohttp/websockets

Всем привет!) Пока разобрался с простейшим эхо сервером:

import asyncio
from aiohttp import web

async def ws_handler(request):
   ws_resp = web.WebSocketResponse()
   await ws_resp.prepare(request)

   async for msg in ws_resp:
       await ws_resp.send_str(msg.data)

   return ws_resp

app = web.Application()
app.router.add_get('/ws', ws_handler)

Как в вышеприведенный код грамотно добавить цикл, в котором сервер будет каждые 10 секунд отправлять сообщение клиенту?

# примерный набросок цикла
while True:
    await ws_resp.send_str('message')
    await asyncio.sleep(10)

Тут интересуют два варианта:
1) Для каждого клиента свой частный цикл, и если соединение с этим клиентом по какой-либо причине закрывается, то частный цикл прекращает свою работу;
2) Один независимый цикл для всех клиентов, прекращает работу вместе с завершением приложения

 , ,

a-lexx ()