LINUX.ORG.RU

Смешивание асинхронного и синхронного кода в Python

 ,


1

3

Допустим, есть приложение на flask + socketio + eventlet. Внутри обработчика сообщения от socketio я могу писать какой-нибудь синхронный код. Например, сделать print или отправить ответ клиенту. Для конкретики вот примерный мой код:

import eventlet
import eventlet.wsgi
import socketio

from flask import Flask, render_template


sio = socketio.Server()
app = Flask(__name__)


@app.route('/')
def main():
    return render_template('main.html')


@sio.on('my-msg')
def handle_login(sid, data):
    print('my-msg: ' + str(data))


if __name__ == '__main__':
    app = socketio.Middleware(sio, app)
    eventlet.wsgi.server(eventlet.listen(('', 8000)), app)

Однако, мне нужно работать с библиотекой, которая поддерживает только асинхронный стиль программирования. Не на базе eventlet или чего-то подобного, а на базе непосредственно yield from и т. п. Каким образом я могу делать к ней обращения внутри обработчика сообщения?

Пока приходит в голову, что нужно запустить в отдельном потоке eventloop от asyncio и запускать coroutine в нём при приходе сообщения. В свою очередь по окончании работы coroutine нужно как-то вызвать sio.emit (но это может быть не безопасно из-за того, что я делаю это из другого потока).

★★★★★

Обычно есть синхронные helper-ы. Например, для tornado есть run_sync

http://www.tornadoweb.org/en/stable/ioloop.html#tornado.ioloop.IOLoop.run_sync

Для asyncio, вроде как (сам с asyncio не работал), должно работать run_until_complete

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEven...

Vovka-Korovka ★★★★★ ()
Последнее исправление: Vovka-Korovka (всего исправлений: 1)
Ответ на: комментарий от Vovka-Korovka

У меня немного другой случай. eventlet тоже не совсем синхронный - он event-driven, насколько я понимаю. То есть там главный цикл ждёт всяких событий и вызывает нужные обработчики. Если я подвешу обработчик события, то другие события не будут обрабатываться. А это плохо.

Мне нужно как-то запустить асинхронный код, при этом чтобы он не остановил выполнение eventlet. А когда асинхронный код выполнится, мне необходимо сделать sio.emit(...).

По факту и то, и то - асинхронное программирование, но реализованное по-разному (asyncio позволят писать типа синхронно, но на самом деле асинхронно, eventlet заставляет самостоятельно делать машину состояний).

KivApple ★★★★★ ()
Ответ на: комментарий от KivApple

Ну тогда вариантов два

1. Наиболее правильный - искать/писать переходники между разными event-loop-ами. 2. Второй вариант - то, что ты описал. Запускать в отдельном потоке.

Vovka-Korovka ★★★★★ ()

Я как всегда не въехал в суть проблемы, но ты можешь использовать разные потоки и очереди.

Только вот надо понять какая очередь тебе подойдёт. Если ты передаёшь из асинхронного кода в синхронный то queue.Queue. Если наоборот... то хз, asyncio.Queue не подойдёт, тебе нужен анус (янус?): http://stackoverflow.com/questions/32889527/is-there-a-way-to-use-asyncio-que... . Либо call_soon_threadsafe(), но я им пока ещё не пользовался.

true_admin ★★★★★ ()
Ответ на: комментарий от true_admin

Вот я сейчас как раз попытался вызвать call_soon_threadsafe:

from datetime import timedelta
import eventlet
import eventlet.wsgi
import socketio
import asyncio
from threading import Thread
from flask import Flask, render_template


sio = socketio.Server()
app = Flask(__name__)
loop = asyncio.new_event_loop()


@app.route('/')
def main():
    return render_template('main.html')


@sio.on('my-msg')
def handle_my_msg(sid, data):
    loop.call_soon_threadsafe(handle_my_msg_async, (data,))


@asyncio.coroutine
def handle_my_msg_async(data):
    print(str(data))


def run_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


if __name__ == '__main__':
    loop_thread = Thread(target=run_loop, args=(loop,))
    loop_thread.start()
    app = socketio.Middleware(sio, app)
    eventlet.wsgi.server(eventlet.listen(('', 8000)), app)

В результате получаю ошибку:

TypeError: coroutines cannot be used with call_soon()

ЧЯДНТ?

За идею с очередью спасибо. Думаю, её стоит использовать для передачи данных обратно. В смысле, что мне нужно как-то вызвать sio.emit, чтобы отдать ответ клиенту и скорее всего это можно сделать только из цикла eventlet.

KivApple ★★★★★ ()
Последнее исправление: KivApple (всего исправлений: 1)
Ответ на: комментарий от KivApple

Я тебе 100% правильного ответа не дам т.к. давно уже с asyncio не баловался. Попробуй убрать декоратор asyncio.coroutine.

true_admin ★★★★★ ()
Ответ на: комментарий от true_admin

Нашёл ответ: http://stackoverflow.com/questions/29296064/python-asyncio-how-to-create-and-...

Написал на его основе небольшой модуль:

from threading import Thread, current_thread, Event
import asyncio
import functools
from concurrent.futures import Future


class AsyncThread(Thread):
    def __init__(self, start_event=Event()):
        Thread.__init__(self)
        self.loop = None
        self.tid = None
        self.event = start_event

    def run(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        self.tid = current_thread()
        self.loop.call_soon(self.event.set)
        self.loop.run_forever()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)

    def add_task(self, coro):
        """this method should return a task object, that I
          can cancel, not a handle"""
        def _async_add(func, fut):
            try:
                ret = func()
                fut.set_result(ret)
            except Exception as e:
                fut.set_exception(e)

        f = functools.partial(asyncio.async, coro, loop=self.loop)
        if current_thread() == self.tid:
            return f() # We can call directly if we're not going between threads.
        else:
            # We're in a non-event loop thread so we use a Future
            # to get the task from the event loop thread once
            # it's ready.
            fut = Future()
            self.loop.call_soon_threadsafe(_async_add, f, fut)
            return fut.result()

    def cancel_task(self, task):
        self.loop.call_soon_threadsafe(task.cancel)

    def wait(self):
        self.event.wait()

Теперь можно делать так:

async_thread = AsyncThread()
async_thread.start()
async_thread.wait()
...
@sio.on('my-msg')
def handle_my_msg(sid, data):
    async_thread.add_task(handle_my_msg_async(data))

@asyncio.coroutine
def handle_my_msg_async(data):
    print(str(data))
KivApple ★★★★★ ()
Ответ на: комментарий от KivApple

Да, оно, по-сути, создаёт таск, поэтому работает. Сам по себе короутин/генератор не может исполняться напрямую в event loop, его нужно обернуть в task или future.

Я бы попробовал тупо вызвать create_task() на короутин и скормить его call_soon_threadsafe . Тот код что ты показал хорош, но как-то слишком наворочено, ведь твою проблему можно решить в одну строчку. Ну, это тебе решать.

true_admin ★★★★★ ()
Ответ на: комментарий от KivApple

А может я и нагнал про create_task. Asyncio работает слишком мудрёно для меня, я без доков и экспериментов не скажу будет это работать или нет.

true_admin ★★★★★ ()
Ответ на: комментарий от true_admin

Теперь главная проблема как вызвать sio.emit. Если я это делаю прямо из асинхронного кода, то ничего клиенту не приходит. Если делаю прямо из обработчика события socketio, то приходит.

То есть по сути дела мне нужно действие аналогичное call_soon_threadsafe, но для eventlet.

KivApple ★★★★★ ()
Последнее исправление: KivApple (всего исправлений: 1)
Ответ на: комментарий от KivApple

Я не знаю что такое eventlet и прочее, зачем оно тебе вообще нужно? Если для http и вебсокетов то просто юзай aiohttp.

true_admin ★★★★★ ()
Ответ на: комментарий от true_admin

Flask вроде как не умеет его использовать. Только один из более старых event loop'ов.

KivApple ★★★★★ ()
Ответ на: комментарий от KivApple

Так я про то что flask может быть вообще не нужен :). aiohttp поддерживает вебсокеты, сессии, авторизацию сам прикрутишь :). Шаблоны рендерить тоже не проблема. Нужен ли тебе фласк вообще?

true_admin ★★★★★ ()
Ответ на: комментарий от true_admin

Уже так подумал и выпилил flask. Для вебсокетов использую sockjs вместо socket.io (всё же по сравнению с обычными вебсокетам мне нравится, что в случае проблем клиент перейдёт на более старые технологии).

Только вот не могу понять, можно ли заставить здешний роутер на все непонятные запросы (по факту всё, что не начинается на /static/) отдавать /static/index.html (там одностраничное приложение, которое использует History API).

KivApple ★★★★★ ()
Последнее исправление: KivApple (всего исправлений: 3)
Ответ на: комментарий от KivApple

Может, там есть дефолтный роутер или можно тупо перехватить 404?

true_admin ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.