LINUX.ORG.RU

Python Aiohttp server: Как сделать response не из handler?

 , , ,


0

2

Всем привет! Копаюсь в aiohttp. Возник сабж знатокам aiohttp. Тривиальный код сервера с обработчиком (handler) запроса:

from aiohttp import web

async def hello(request): return web.Response(text=«Hello, world»)

app = web.Application() app.router.add_get('/', hello)

web.run_app(app)

В доках сказано, что обработчик всегда должен возвращать объект Response. Можно как-то ответить на запрос не из обработчика (в данном случае hello(request)), а например в другой корутине, передав например в глобальный массив контекст запроса? Например, в node.js каждый запрос сопровождается двумя объектами, привязанными к контексту – request и response. Соответственно c response можно делать все что угодно и в любом месте сделать response.end(‘hello’). Сорри если туплю, новичок пока еще.

как-то ответить на запрос не из обработчика

А как вы себе это представляете? Вот у вас вызывается функция hello, и что дальше вы планируете делать? Как-то ее завершить, где-то сохранить запрос, через какую-то другую функцию его взять , обработать и вернуть ответ, я правильно понял? Что это даст? Почему не вызывать эту другую функцию из hello?

micronekodesu ★★ ()
Ответ на: комментарий от micronekodesu

Ну например я не хочу отвечать на запрос до определенного события. Соответственно мне нужно этот запрос как-то сохранить например (псевдокод): app['requests'].append(request). А когда событие произойдет разом ответить на все накопившиеся в списке запросы.

Luhashan ()

Ну так в ноде ты внутри коллбека всё равно можешь навешивать коллбеки и они должны разрешиться до конечного «return». Также и здесь, ты из hello можешь вызывать что хочешь, главное, чтобы через async/await контекст вовремя вернулся к конечному «return».

menangen ★★★★★ ()
Ответ на: комментарий от menangen

Да это ОК. Но например, если идет какой-то асинхронный процесс длительный, до окончания которого не хочется никому отвечать. После завершения нужно всем ответить. Первое что в голову приходит это сохранить контекст запросов в чем-то типа глобального списка, а потом после завершения длительного процесса всем ответить. Вторая мысль еще более тупая. В контексте app создать типа флага завершен процесс или нет и в цикле while true его проверять например каждую секунду через await asyncio.sleep(1). Первый вариант я не очень понимаю как сделать с API aiohttp, второй мне кажется очень тупым. Поэтому подскажите плиз правильный путь для такой задачи. Еще раз повторюсь, новичок в асинхронном программировании, сорри если вопрос глупый.

Luhashan ()
Ответ на: комментарий от Luhashan

Можно как-то ответить на запрос не из обработчика?

Нельзя.

Ну например я не хочу отвечать на запрос до определенного события.

И что тебе мешает вызвать в обработчике запроса соответствующую корутину?

После завершения нужно всем ответить.

Как вариант asyncio.Queue

vvn_black ★★★★ ()
Ответ на: комментарий от Luhashan

Что клиент будет делать в это время? (inb4: отваливаться по таймауту).

То что тебе нужно называется вебсокеты. Клиент шлет запрос, сервер говорит «ОК, я тебя понял, как будет готово - сообщу», делает там что нужно и потом со своей стороны пихает результаты клиенту. В «обычном» вебе ты так не сделаешь, в лучшем случае сможешь использовать ajax - отправить «задачу» и потом раз в N секунд спрашивать «ну что, готово?».

micronekodesu ★★ ()

Это не так делается. В асинкио код пишется линейно без всяких колбеков. В обработчике вызываешь свою долгую функцию (корутину да) получаешь результат, обрабатываешь и возвращаешь респонз. Все тупо в лоб.

Если не хочешь в лоб то юзай что нибудь другое навроде твистеда или торнады.

redixin ★★★★ ()
Ответ на: комментарий от redixin

Так а нет возможности как-то обойти handler? Это ведь тупо функция, которую мы регистрируем в роутере, соответственно она откуда-то вызывается и возвращает response. Нельзя ли как-то заюзать то место куда посылать response с контекстом запроса после того как response сформирован? Тогда почти полная свобода будет как в node.js. Просто может кто копал aiohttp поглубже?

Luhashan ()
Ответ на: комментарий от redixin

И потом да, это понятно, но если долгая функция например играется сама по себе где-то, например что-то постоянно слушает, типа демон, и ждет события, никак не связанного с запросом?

Luhashan ()
Ответ на: комментарий от Luhashan

Его не нужно копать глубже чтобы понимать какой там API.

Еще раз, асинкио сделан таким образом чтобы писать код в обычном линейном стиле. Вон тебе vvn_black скинул пример того как это делается в асинкио. Все максимально просто и очевидно. и полная свобода тоже присутсвует.

Попробуй объяснить что у тебя не получается сделать с таким подходом. Где оно тебя ограничивает?

redixin ★★★★ ()
Ответ на: комментарий от redixin

Это очевидный подход да, я сам его описывал выше. Но мне показалось, что он туповат что-ли — дергать проверку постоянно некомильфо. Можно тоже самое через события реализовать? Про asyncio.Event почитать?

Luhashan ()
Ответ на: комментарий от Luhashan

Причем тут дергание проверки? Смысл в том что вместо навешивания колбека здесь вызывается «блокирующая» функция. Она может делать что угодно — хоть в цикле проверку дергать, хоть события ждать.

Почитай не только про евент а как минимум про синронизационные примитивы

https://docs.python.org/3/library/asyncio-sync.html

https://docs.python.org/3/library/asyncio-queue.html

И вообще вот это все стоило бы почитать

https://docs.python.org/3/library/asyncio.html

И напиши подробнее какого события откуда нужно ждать. В большинстве случаев это можно сделать либо через событие либо через очередь.

redixin ★★★★ ()
Последнее исправление: redixin (всего исправлений: 1)
Ответ на: комментарий от redixin

Огромное спасибо за наставления и указанный путь. Сейчас все изучим. По поводу проверки: async def handler(request): return await processing(request)

async def processing(request): ... while True: await asyncio.sleep(1) if condition(): break return web.Response() Что это, как не ежесекундная проверка произошло там что-то или нет?

Откуда и какого события: Если абстрактно, то есть две корутины. Вопрос: как им обмениваться данными, кроме как дергать каждую секунду что-то где-то, где они типа должны появиться?

Luhashan ()
Ответ на: комментарий от Luhashan

Дергать каждую секунду это не вариант.

В простейшем случае одна корутина другой может послать сообщение через объект asyncio.Queue. Но нужно понимать что есть get/get_nowait и put/put_nowait, и знать какая между ними разница. Еще нужно обратить внимание на maxsize.

Опять же ты говоришь очень размыто. Например нужно понимать что корутина которая работает в фоне не может слать сообщения в пустоту. Они должны обязательно считываться с другой стороны. Каждое отправленное сообщение будет висеть в очереди пока его не вытащат с другой стороны.

Если очередь не подходит, то есть всевозможные Events, Conditions и старые добрые семафоры.

redixin ★★★★ ()
Ответ на: комментарий от Luhashan
from aiohttp import web
import asyncio

@asyncio.coroutine
async def coro_world():
    print("Start processing...")
    await asyncio.sleep(3)

    return "world"

async def handle(request):
    res = await coro_world()
    return web.Response(text="Hello, " + res)


app = web.Application()
app.add_routes([web.get('/', handle)])

web.run_app(app)

Вместо засыпания - можешь дергать любую херню интенсивную.

menangen ★★★★★ ()

Конкретно по твоему вопросу в начале топика

from aiohttp import web
import asyncio

@asyncio.coroutine
async def coro_world():
    print("Start processing...")
    await asyncio.sleep(3)
    print("Sent.")
    return web.Response(text="Hello, world")

async def handle(request):
    return await coro_world()


app = web.Application()
app.add_routes([web.get('/', handle)])

web.run_app(app)
menangen ★★★★★ ()

Если я правильно тебя понял, то ты хочешь что-то вроде этого

import asyncio
import logging
from http import HTTPStatus

from aiohttp import client, web

logging.basicConfig(level=logging.INFO)

app = web.Application()

client_logger = logging.getLogger('client')
server_logger = logging.getLogger('server')


async def serve_echo(req: web.Request, resp: web.StreamResponse):
    while True:
        data = await req.content.read(3)
        if data:
            server_logger.info('got %s', data)
            await resp.write(data)
            server_logger.info('sent %s', data)
        else:
            server_logger.info('got eof')
            await resp.write_eof()
            server_logger.info('sent eof')
            return


async def echo_handle(req: web.Request):
    server_logger.info('got request')
    resp = web.StreamResponse(
        status=HTTPStatus.OK,
        headers={'content-type': 'text/plain'}
    )
    await resp.prepare(req)
    await serve_echo(req, resp)
    server_logger.info('echo served')
    return resp

app.router.add_post('/echo', echo_handle)


async def echo_streaming(future_resp, done_event):
    resp = await future_resp

    client_logger.info('sending xxx')
    yield b'xxx'
    data = await resp.content.read(3)
    client_logger.info('got %s from server', data)

    client_logger.info('sending yyy')
    yield b'yyy'
    data = await resp.content.read(3)
    client_logger.info('got %s from server', data)

    done_event.set()


async def get_echo():
    async with client.ClientSession() as session:
        future_resp = asyncio.Future()
        done_event = asyncio.Event()
        resp_ctx = session.post(
            'http://localhost:8080/echo',
            data=echo_streaming(future_resp, done_event)
        )
        async with resp_ctx as resp:
            future_resp.set_result(resp)
            await done_event.wait()


async def main():
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, host='0.0.0.0', port=8080)
    await site.start()
    await get_echo()
    await runner.cleanup()

asyncio.get_event_loop().run_until_complete(main())

У меня этот код печатает

INFO:server:got request
INFO:client:sending xxx
INFO:server:got b'xxx'
INFO:server:sent b'xxx'
INFO:client:got b'xxx' from server
INFO:client:sending yyy
INFO:server:got b'yyy'
INFO:server:sent b'yyy'
INFO:client:got b'yyy' from server
INFO:server:got eof
INFO:server:sent eof
INFO:server:echo served
INFO:aiohttp.access:127.0.0.1 [14/Jun/2018:23:38:27 +0000] "POST /echo HTTP/1.1" 200 165 "-" "Python/3.6 aiohttp/3.1.3"

Но оно тебе не нужно.

ival ★☆ ()

long poll proof of concept

import logging
import asyncio
from http import HTTPStatus

from aiohttp import web, client

logging.basicConfig(level=logging.INFO)

client_logger = logging.getLogger('client')
server_logger = logging.getLogger('server')

client_ids = ['client_{0}'.format(i) for i in range(1, 4)]

app = web.Application()
app['trigger'] = asyncio.Condition()
app['clients'] = set()
app['clients_ready'] = asyncio.Event()


async def wait_trigger(request: web.Request):
    server_logger.info('get wait request')
    request.app['clients'].add(request.query['client_id'])
    if len(request.app['clients']) == len(client_ids):
        request.app['clients_ready'].set()
    await request.app['trigger'].acquire()
    await request.app['trigger'].wait()
    request.app['trigger'].release()
    return web.Response(status=HTTPStatus.NO_CONTENT)


app.router.add_get('/wait', wait_trigger)


async def fire_trigger(request: web.Request):
    trigger = request.app['trigger']
    await trigger.acquire()
    trigger.notify_all()
    trigger.release()
    return web.Response(status=HTTPStatus.NO_CONTENT)


app.router.add_post('/fire', fire_trigger)


async def client_flow(client_id):
    logger = client_logger.getChild(str(client_id))
    async with client.ClientSession() as session:
        logger.info('creating request')
        ctx = session.get(
            'http://localhost:8080/wait',
            params={'client_id': client_id}
        )
        async with ctx as resp:
            logger.info('got the response')
            assert resp.status == HTTPStatus.NO_CONTENT
    logger.info('done')


async def do_long_poll():
    loop = asyncio.get_event_loop()
    tasks ={
        client_id: loop.create_task(client_flow(client_id))
        for client_id in client_ids
    }

    client_logger.info('waiting for clients to start')
    await app['clients_ready'].wait()

    client_logger.info('waiting for 3 seconds')
    await asyncio.sleep(3)

    client_logger.info('fire')
    async with client.ClientSession() as session:
        async with session.post('http://localhost:8080/fire') as resp:
            client_logger.info('fired')

    client_logger.info('waiting for clients to finish')
    for task in tasks.values():
        await task


async def main():
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, host='0.0.0.0', port=8080)
    await site.start()
    await do_long_poll()
    await runner.cleanup()


asyncio.get_event_loop().run_until_complete(main())

Выхлоп

INFO:client:waiting for clients to start
INFO:client.client_1:creating request
INFO:client.client_2:creating request
INFO:client.client_3:creating request
INFO:server:get wait request
INFO:server:get wait request
INFO:server:get wait request
INFO:client:waiting for 3 seconds
INFO:client:fire
INFO:aiohttp.access:127.0.0.1 [15/Jun/2018:11:55:28 +0000] "POST /fire HTTP/1.1" 204 157 "-" "Python/3.6 aiohttp/3.2.1"
INFO:aiohttp.access:127.0.0.1 [15/Jun/2018:11:55:25 +0000] "GET /wait?client_id=client_1 HTTP/1.1" 204 157 "-" "Python/3.6 aiohttp/3.2.1"
INFO:aiohttp.access:127.0.0.1 [15/Jun/2018:11:55:25 +0000] "GET /wait?client_id=client_2 HTTP/1.1" 204 157 "-" "Python/3.6 aiohttp/3.2.1"
INFO:aiohttp.access:127.0.0.1 [15/Jun/2018:11:55:25 +0000] "GET /wait?client_id=client_3 HTTP/1.1" 204 157 "-" "Python/3.6 aiohttp/3.2.1"
INFO:client:fired
INFO:client:waiting for clients to finish
INFO:client.client_1:got the response
INFO:client.client_1:done
INFO:client.client_2:got the response
INFO:client.client_2:done
INFO:client.client_3:got the response
INFO:client.client_3:done
ival ★☆ ()
Ответ на: комментарий от ival

Все вкупил, доки почитал. Очень круто :) И большое вам спасибо. Иногда одного конкретного примера достаточно, чтобы во всем сложном разобраться. И в целом, друзья, всем огромное спасибо, кто откликнулся. Не ожидал столь профессиональной и доброжелательной реакции. В случае long poll остался вопрос как лучше обрабатывать таймаут. Самое тупое что в голову приходит обрабатывать исключение по таймауту, освобождать Condition и посылать ответ клиенту о реконнекте и потом все по новой. Или после исключения будет уже не ответить, соединения не будет?

Luhashan ()
Ответ на: комментарий от Luhashan

Или после исключения будет уже не ответить, соединения не будет?

При исключении в handle'е клиенту вернется пятисотка. Его можно обработать и вернуть что-то более осмысленное. Если по таймауту отвалится клиент, то aiohttp пробросит в handle какое-то исключение (не помню точно какое)

ival ★☆ ()