LINUX.ORG.RU

python queue thread закрытие треда

 , ,


1

3
from Queue import Queue
from threading import Thread
import time

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
                self.stop()
            finally:
                self.tasks.task_done()

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        # pass
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        # print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(48)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)
    pool.wait_completion()

    while True:
        print 'aaa1:', pool
        time.sleep(1)

После инструкции pool.wait_completion() хотелось, чтобы все треды умирали. А они не умирают. Как это исправить ?

★★★★

Сделать вручную через какую-нибудь conditional variable или флаг который бы проверялся в треде. Либо в очередь слать специальный флаг «треду завершиться» (я так делаю). Короче, посмотри гугл, есть готовые рецепты. Например: http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-i...

true_admin ★★★★★
()
Ответ на: комментарий от true_admin

Неа, я здаюсь. Гугл ничего не дает. Покажи как надо)

bryak ★★★★
() автор топика

короче ситуация такова, что при отработке тредов - треды остаются waiter.acquire(). Как их закрыть без закрытия main - остается загадкой

bryak ★★★★
() автор топика
Ответ на: комментарий от bryak

Ну в первом же ответе здесь написали всё.

class Worker(Thread):
    ...
    def run(self):
        while True:
            f_a = self.tasks.get()
            if f_a is None:
                return
            func, args, kargs = f_a
            ...

class ThreadPool:
    def __init__(self, num_threads):
        self.num_threads = num_threads
        ...

    ...

    def stop_all(self):
        for i in range(self.num_threads):
            self.tasks.put(None)

Т.е. каждый поток выберет себе None и закроется. Тоже делаю так.

ximeric
()
Ответ на: комментарий от bryak

Шутить изволите :-)

У меня моё работает. Остальные тестируют своё сами.

ximeric
()
Ответ на: комментарий от ximeric

каждый поток выберет себе None

Меня этот момент немного смущает :). А что если кол-во потоков неизвестно? Я бы сделал так: если в поток прилетел «None» то кладём его обратно в очередь и завершаемся. Потом, когда все потоки завершились (можно дождаться через threadpool.join()) мы очищаем очередь (достаём последний None).

Но, наверно, это оверинжиниринг. В реальности у меня кол-во воркеров обычно известно и фиксировано.

true_admin ★★★★★
()

В потоке запускаем:

    for work in iter(queue.get, STOP):
        work()

где STOP это любая константа, помещаем её в очередь когда надо завершить воркера.

Leron ★★
()
Последнее исправление: Leron (всего исправлений: 2)
Ответ на: комментарий от true_admin

У меня количество может изменяться динамически, через файл конфигурации, без перезапуска. Конечно, нужно самостоятельно изменять переменную num_threads, после реального изменения количества. Неоднократно проверена работоспособность и по увеличению, и по уменьшению количества потоков - работает так, как нужно.

ximeric
()
Ответ на: комментарий от ximeric

через файл конфигурации, без перезапуска

А как ты это делаешь? Просто по SIGHUP перечитывать конфиг и обновлять переменные? Или есть какая-то волшебная либа?

true_admin ★★★★★
()
Ответ на: комментарий от true_admin

Проверка штампа времени файла раз в минуту, если изменился - перечитать.

ximeric
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.