LINUX.ORG.RU

Python и гонки при блокировке очереди

 , , ,


0

1

Есть примерно такой код.

import queue, threading

def test(q):
    time.sleep(4)
    c = q.get()
    print('thread2 - got {}'.format(c))
    q.task_done()
    print('thread2 - sending bar')
    q.put("bar")
    q.join()
    print('thread2 - done')

shared_queue = queue.Queue()
t = threading.Thread(target=test, args=(shared_queue, ))
t.start()
print('thread1 - sending foo')
shared_queue.put("foo")
shared_queue.join()
print('thread1 - got {}'.format(shared_queue.get()))
shared_queue.task_done()
t.join()

Т.е. в основном потоке помещаем сообщение в очередь и блокируем его, пока во втором потоке сообщение не достанут и не пометят, как обработанное и тоже самое, но наоборот. Проблема в том, что в большинстве случаев между task_done и помещением сообщения во втором потока, не происходит разблокировки основного потока. Если же сделать небольшую задержку (например, time.sleep), после task_done во втором потоке, то всё выполняется, как и ожидалось.

Вопрос: этого принципиально нельзя избежать, кроме как таким способом, использованием multiprocessing.JoinableQueue или ещё как?

q.put("bar")
...

shared_queue.join()

Queue.join()

Blocks until all items in the queue have been gotten and processed.

Потоки в питоне работают не параллельно (но даже если бы было параллельно, в данном случае была бы возможность фейла всё равно), переключения происходят по таймеру или io, переключения после task_done во втором потоке не происходит и put выполняется раньше join в мейнтреде, следовательно, мейнтред ждёт ещё одного task_done.

WitcherGeralt ★★ ()
Последнее исправление: WitcherGeralt (всего исправлений: 2)

Тебе достаточно убрать join в мейнтреде, ибо get всё равно блокирующий. Но если ты хочешь используя два потока получить что-то типа каналов, то это лишено смысла.

И используя треды при наличии asyncio, ты с большой долей вероятности делаешь что-то не так.

WitcherGeralt ★★ ()
Последнее исправление: WitcherGeralt (всего исправлений: 1)
Ответ на: комментарий от WitcherGeralt

достаточно убрать join в мейнтреде, ибо get всё равно блокирующий

Отдельная блокировка нужна для того, что бы можно было делать так

while not shared_queue.empty():
    msg = shared_queue.get()
    shared_queue.task_done()
    shared_queue.put(handle_message(msg))
    shared_queue.join()
Иначе ты тут же забираешь, что положил в очередь. Можно, конечно, сделать две односторонние очереди, но я как раз думал этого избежать.

WDWTFWW ()