LINUX.ORG.RU

Очереди и многопоточность в python

 , ,


2

3

Сразу к делу.

Есть сервер. Сначала думал над тем, как вручную реализовать, потом наткнулся на http://docs.python.org/py3k/library/socketserver.html и решил, что это то, что мне надо.

Собственно, вся реализация сервера:

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

Обрабатывает все запросы некий класс

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        cur_thread = threading.current_thread()
        response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
        self.request.sendall(response)

Всё работает как надо (если скопировать пример из дока). Нюанс вот в чём. Мне нужно чтобы сервер обслуживал многих клиентов, а они могли выполнять какие-то операции. Например, пользователь хочет зарегистрироваться - для этого клиент посылает специальный запрос, сервер его обрабатывает, и он должен добавить запись в базу данных (использую sqlite).

Но sqlite нельзя использовать из множества потоков!! Решить проблему можно так - выбрать один поток, который и будет писать в базу данных и выполнять все операции. Можно, например взять очередь http://docs.python.org/py3k/library/queue

сделал примерно так, создаю очередь и делаю её полем класса:

tasks = queue.Queue()
ThreadedTCPRequestHandler.tasks = tasks

Также передаю в специальный класс обработчик, которую запускаю в отдельном потоке (и этот поток всегда одинаковый, запускаю его ОДИН раз)

worker = Worker(tasks)

Ну реализация примерно такая

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        ThreadedTCPRequestHandler.tasks.put(data)
        # что посылать в ответ??
        self.request.sendall(???)

class Worker:

    def __init__(self, tasks):
        self.tasks = tasks

    def do(self):
        while True:
            item = q.get()
            # делаю тут что-то, например, работаю с базой данных sqlite
            q.task_done()

Собственно, мне нужно узнать результат выполнения, выполнена ли команда успешно или вообще не выполнена... и вернуть результат клиент (через sock.sendall(b'что-то'). Как это сделать? %) мне создать ещё одну очередь? такой вариант наверное не прокатит...

Я пробовал просто отправлять в очередь tasks = queue.Queue() экземпляры сокетов (и делать sock.recv/sock.send уже из класса Worker) - но такой вариант вообще не работает, сокет становится закрытым... то есть с сокетом можно получается работать только из того потока, который обслуживает клиента.

Что делать? %)

п.с., не знаю почему последние три листинга без подсветки кода... везде писал code=python...

BattleCoder ★★★★★ ()

мне создать ещё одну очередь? такой вариант наверное не прокатит...

Можно, например, в каждом handler-е создавать его уникальную очередь, и класть её вместе с данными в tasks, а внутри worker-а, после работы с sql-ем, отсылать в эту очередь ответ.

Т.е.:

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        q = queue.Queue()
        ThreadedTCPRequestHandler.tasks.put([data, q])
        answer = q.get()
        # do something, close queue etc
        self.request.sendall(answer)

class Worker:

    def __init__(self, tasks):
        self.tasks = tasks

    def do(self):
        while True:
            item, handler_queue = q.get()
            # делаю тут что-то, например, работаю с базой данных sqlite
            handler_queue.put(sql_result)
            q.task_done()
anonymous ()

Всё работает как надо (если скопировать пример из дока). Нюанс вот в чём. Мне нужно чтобы сервер обслуживал многих клиентов, а они могли выполнять какие-то операции. Например, пользователь хочет зарегистрироваться - для этого клиент посылает специальный запрос, сервер его обрабатывает, и он должен добавить запись в базу данных (использую sqlite).

Возможен ли вариант использовать вместо sqlite сетевую СУБД, такую как mysql или postgres? Тогда проблемы с записью в базу данных не будет вовсе.

dicos ()

Я не знаю тонкостей питона и sqlite, но, если sqlite поддерживает одновременную работу нескольких клиентов, в каждом заинтересованном потоке-обработчике я бы создал отдельно по подключению к бд.

staseg ★★★★★ ()
Ответ на: комментарий от dicos

есть вариант создавать соединения от каждого клиента для каждого запроса, но мне такой вариант почему-то не понравился... наверное я поторопился. почему-то кажется, что от такого числа соединений будут издержки.

ну на отказутойчивость всё равно не претендую, приложение чисто учебное =) вряд ли заменит XMPP/IRC.

BattleCoder ★★★★★ ()
Ответ на: комментарий от anonymous

Аа, действительно свежее решение. :) мог бы догадаться... создавать уникальную очередь для задания, и класть ссылку на неё в основную очередь. Работать будет.

Но попробую лучше сначала вариант с созданием соединения из каждого треда, а там может и переделаю.

BattleCoder ★★★★★ ()
Ответ на: комментарий от anonymous

хотя.... с другой стороны, ТАКОЙ подход лишает меня вообще практически проблем с блокировками и синхронизацией... всю работу выполняет один поток.

прироста производительности от многопоточности тут всё равно не будет, поэтому одного потока более чем достаточно.

я передумал, сначала попробую этот вариант :) а там если не получится буду думать

BattleCoder ★★★★★ ()
Ответ на: комментарий от anonymous

ThreadedTCPRequestHandler.tasks.put([data, q])

кстати, критично ли в этой строке список передавать? или можно кортеж? наверное кортежем лучше будет?..

BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

прироста производительности от многопоточности тут всё равно не будет, поэтому одного потока более чем достаточно.

это можно было сказать сразу после слова python.

maloi ★★★★★ ()
Ответ на: комментарий от maloi

советуют использовать много процессов, если нужен прирост.

да, python не годится для задач, где нужна производительность, это я сразу понял. :) именно это задание мне надо сделать на нём.

BattleCoder ★★★★★ ()

ты можешь использовать threading.Event чтобы посылать уведомление когда нужный запрос выполнится. Но если у тебя простые быстрые запросы то почему бы лок не ставить на базу.

Ну и sqlite держит несколько нитей. Начни копать отсюда: http://stackoverflow.com/questions/393554/python-sqlite3-and-concurrency

Возможно придётся каждой нити создать по соединению или курсору, этого я уже не знаю.

true_admin ★★★★★ ()

Но sqlite нельзя использовать из множества потоков!!

use mysql. Не?

А вообще говоря, поищи какое-нить расширение питона с sqlite, наверняка уже кто-то придумал...

drBatty ★★ ()
Ответ на: комментарий от staseg

если sqlite поддерживает одновременную работу нескольких клиентов

каким образом, если оно с одним файлом?

drBatty ★★ ()
Ответ на: комментарий от BattleCoder

советуют использовать много процессов, если нужен прирост.

в данном случае глупость - БД лежит в файле, и кешируется в памяти. Файл один, память тоже одна. Никаких вычислений тут почти нет. Вот если-бы у каждого процессора была хотя-бы своя память...

drBatty ★★ ()
Ответ на: комментарий от drBatty

каким образом, если оно с одним файлом?

а что это меняет? 1) mysql, например, может все innodb-таблицы хранить в одном файле 2) oracle умеет работать с raw-разделами, там всё «одним девайсом».

В общем, не проблема это.

true_admin ★★★★★ ()
Ответ на: комментарий от drBatty

БД лежит в файле, и кешируется в памяти. Файл один, память тоже одна. Никаких вычислений тут почти нет. Вот если-бы у каждого процессора была хотя-бы своя память...

а тут затык не в скорости доступа к памяти, а в тормозах питона.

true_admin ★★★★★ ()
Ответ на: комментарий от BattleCoder

кстати, критично ли в этой строке список передавать? или можно кортеж? наверное кортежем лучше будет?..

Кортеж даже лучше. Список просто читабельнее выглядит, потому для примера и приведён:)

А вообще, вряд ли это заметно повлияет на производительность, так что пофигу.

anonymous ()
Ответ на: комментарий от drBatty

проблему можно считать уже решил, вариант с очередями меня вполне устраивает и освобождает от необходимости делать блокировки, удобно. и вся работа выполняется в одном треде (а остальные треды хватают данные от клиента, просто ему эту работу передают, а потом отправляют результат)

появилась новая проблема. :) мне нужна авторизация... например,клиент отправляет запрос (использую json-rpc), например вызывает функцию login с параметрами username и password. Я это проверяю на стороне сервера и убеждаюсь, что имя и пароль правильные..

а дальше что? %) как запомнить авторизацию? как запомнить, какой клиент авторизован, а какой нет, как их отличать?

например посылает клиент запрос с функцией postMessage и параметрами (username, message), как мне убедиться, что он и есть тот username, а не другой?

На стороне клиента это конечно можно предусмотреть, но такой вариант точно неправильный, надо на стороне сервера.

BattleCoder ★★★★★ ()
Ответ на: комментарий от anonymous

с кортежем всё работает. а разницы в таком мелком случае наверное и нет

BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

дальше что? %) как запомнить авторизацию? как запомнить, какой клиент авторизован, а какой нет, как их отличать?

При авторизации клиента генеришь ему уникальный id, который он должен будет передавать при каждом запросе. Ну а у себя уже хранишь список этих id и привязанные к ним данные, вроде логинов, паролей, прав и т.д. Эдакий cookie на коленке. Передавать его от клиента к серверу в header-е, например. Реализации такого решения в гугле можно посмотреть.

Чтобы данные не утекали кому не надо, лучше соединение шифровать.

anonymous ()
Ответ на: комментарий от anonymous

уже сам до того же самого догадался... генерирую случайное число, проверяю уникальность - и передаю клиенту, и потом клиент его использует.

эх, книжек надо было больше читать в своё время, а не сейчас.

шифрование мне наверное некритично, но может потом добавлю, если время останется.

BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

генерирую случайное число, проверяю уникальность - и передаю клиенту, и потом клиент его использует.

В питоне есть модуль с uuid, чтобы облегчить такое дело.

anonymous ()
Ответ на: комментарий от anonymous

Осталась ещё одна нерешённая проблема.

К python имеет мало отношения, скорее к сетевому программированию.

Написал методы, которые позволяют зарегистрироваться пользователю (добавляется в базу данных), зарегистрировать комнату, зайти в комнату, отправить сообщение в комнату и т.п...

на паре простых примерах проверил (набросал консольный клиент, отправил команды) - работает.

Но - нюанс ещё в том, что необходимо ВСЕХ текущих клиентах оповещать об изменениях на сервере. Например, пользователь добавил сообщение в чат - все остальные пользователи должны об этом знать... и должны автоматически загружать такое сообщение с сервера (на сервере оно хранится, например, в обычном списке, в объекте «комнаты»).. то есть сервер должен отправлять всем клиентам это.

Но сервер прослушивает сокет (через который клиент отправляет команды), поэтому не может через этот же сокет сообщения отправлять. Как мне решить такую проблему? создать ещё один сокет (на соседнем порту, например), или есть другой вариант, более правильный?

BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

Но сервер прослушивает сокет (через который клиент отправляет команды), поэтому не может через этот же сокет сообщения отправлять.

Может.

anonymous ()
Ответ на: комментарий от anonymous

значит я что-то неправильно понял. как может?.. в другом потоке, например? тот же самый сокет? %) можно на примере пояснить?

BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

в другом потоке, например? тот же самый сокет?

Да. Сокеты ведь full-duplex, один поток слушает, второй пишет.

anonymous ()
Ответ на: комментарий от anonymous

Меня ещё кое-что смущает. Реализация сервера из пакета socketserver странная - она позволяет клиенту отправить одно сообщение sock.sendall(???), и всё.. больше сообщений отправлять нельзя. то есть не действует схема «отправил-принял-отправил-принял-...», он закрывает соединение сам.

Это так и должно быть? По ходу придётся все-таки свой сервер реализовывать. Если я хочу клиентам посылать сообщения через тот же сокет, мне надо сохранять с ними соединение.

BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

Меня ещё кое-что смущает. Реализация сервера из пакета socketserver странная - она позволяет клиенту отправить одно сообщение sock.sendall(???), и всё.. больше сообщений отправлять нельзя. то есть не действует схема «отправил-принял-отправил-принял-...», он закрывает соединение сам.

Реализация socketserver тут ни при чём. Это ты сам читаешь из сокета один раз и отсылаешь ответ, после чего твоя функция заканчивается.

Тебе надо что-то такое:

    def handle(self):
        cur_thread = threading.current_thread()
        white True:
            data = str(self.request.recv(1024), 'ascii')
            if not data: break
            #....

Далее уже сам выдумывай, как сделать тред с записью чужих сообщений в этот сокет, тут можно по-разному делать.

А вообще, посмотри, может тебе питоньи asyncore и asynchat подойдут лучше.

anonymous ()
Ответ на: комментарий от anonymous

ок, понял...

решил попробовать по тому же сокету послать сообщение. Не получается.

Разделил логику примерно так. Клиент может посылать серверу request, сервер в ответ присылает response (в формате like json-rpc). кроме того, сервер может посылать notification (клиент в этом случае обратно ничего не посылает).

request работает нормально, response тоже.

Для отправки notification сделал следующую схему - создал ещё одну queue - в неё складываю все уведомления, есть один поток, который запускается при загрузке сервера и прослушивает эту очередь - и посылает через тот же сокет сообщения.

Получаю сообщение «A request to send or receieve data was disaallowed because the socket is not connected and (when se..... дальше неинтересно»

Логично, сокет я ни с кем не соединял. С этого сокета я прослушиваю соединение. С кем мне надо его соединять? Я хочу ВСЕМ разослать сообщение (всем подключённым на данный момент клиентам). Как проще всего?

Код:

class _Connection(threading.Thread):
    def __init__(self, sock, addr, requests):
        self.sock = sock
        self.addr = addr
        # очередь запросов)
        self.requests = requests
        super(_Connection, self).__init__()
    def run(self):
        while True:
            request = self.sock.recv(1024).decode('utf-8')
            result = queue.Queue()
            # получаем запрос и складываем его в очередь
            # очередь обрабатывает другой поток
            self.requests.put((request, result))
            self.sock.sendto(result.get(), self.addr)
            result.task_done()
        self.sock.close()

class _Broadcaster(threading.Thread):
    def __init__(self, sock, notifications):
        self.sock = sock
        # очередь уведомлений, другой поток складывает уведомление в неё
        # этот поток берёт и рассылает
        self.notifications= notifications
        super(_Broadcaster, self).__init__()
    def run(self):
        while True:
            # вот на этой строке ошибка... что исправить?
            self.sock.sendall(self.notifications.get())
            self.notifications.task_done()

class MessageServer(object):
    def __init__(self, address, port, requests, notifications):
        self.address = (address, port)
        self.requests = requests
        self.notifications = notifications
        
    def start(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(self.address)
        sock.listen(5)
        _Broadcaster(sock, self.notifications).start()
        while True:
            s, a = sock.accept()
            _Connection(s, a, self.requests).start()
BattleCoder ★★★★★ ()
Ответ на: комментарий от BattleCoder

Логично, сокет я ни с кем не соединял. С этого сокета я прослушиваю соединение. С кем мне надо его соединять? Я хочу ВСЕМ разослать сообщение (всем подключённым на данный момент клиентам). Как проще всего?

Храни список соединений где-либо. При создании инстанса _Connection клади его в список, а когда надо отсылать сообщение - пробегайся по списку и шли каждому конкретному соединению.

Что-то такое:

class _Connection(.....):
    #....
    def send_msg(self, msg):
        self.sock.sendall(msg)
    #...

#... skip ...
class _Broadcaster(threading.Thread):
    def __init__(self, ..., clients):
        #...
        self.clients = clients
        #...

    def run(self):
        while True:
            #...
            for client in self.clients:
                client.send_msg(message)
anonymous ()
Ответ на: комментарий от anonymous

спасибо, аноним

вроде всё просто оказывается, а догадаться тяжело :)

в сухой документации таких примеров мало приводится.

наверное книжку надо было в своё время читать по теории сетей... была у меня где-то в pdf валялась... кажется «разработка сетевых приложений под UNIX, Стивенс»... а сейчас вот на такие книжки тупо времени нет, а они толстые.

BattleCoder ★★★★★ ()

Всем спасибо, клиент и сервер работают. :) Как надо. осталось дореализовать нужные команды и написать gui... ну в исправить ошибки в коде, который на стороне базы данных... чтобы всё это хозяйство «хоть как-то» работало.

BattleCoder ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.