LINUX.ORG.RU

Последовательный обход списка несколькими потоками

 ,


0

1

Приветствую всех.

Разбираюсь с многопоточностью в java, и пытаюсь решить следующую задачу.

Есть два списка, в одном хранятся некоторые объекты, в другом треды. На первом этапе, предположим, что объекты не изменяются, и список в котором они хранятся тоже не изменяется. Для ясности предположим, что у меня пять тредов и миллиард объектов в другом списке.

Мне нужно гарантировать последовательный обход элементов списка тредами, т.е.:

  1. первый элемент обрабатывается thread1 (остальные четыре ждут);
  2. thread1 переходит на 2ой элемент, thread2 обрабатывает 1ый (три ждут);
  3. thread1 на 3, thread2 на 2, thread3 на освободившийся 1 и т.д.

При этом потоки не должны «обгонять» друг друга — сначала по элементам проходит thread1, потом thread2 и т.д.

Synchronized гарантирует, что с некоторым куском кода будет работать только один поток. Мне нужно гарантировать, не просто доступ к объекту одного потока, а потока с конкретным номером.

Готового кода не прошу, подскажите по алгоритму решения, и какие примитивы синхронизации лучше использовать.

На втором этапе, планирую менять значения полей объектов в обрабатываемом списке, удалять элементы из списка и убивать потоки. Т.е. в какой-то момент может остаться работающими (например): thread2, thread3 и thread5, но последовательность обхода должна остаться та же (2 обходит все, затем 3, и затем 5).

Любые подсказки и ссылки категорически приветствую.


Сделай каждому потоку кроме первого свою блокирующуюся очередь. Пусть первый поток выбирает элементы, обрабатывает и перекидывает в очередь следующего потока. Каждый следующий после обработки должен перекидывать элементы в очередь следующего.

Тут могут возникнуть проблемы с переполнением очереди, в случае если потоки сильно разные по скорости обработки. Если это проблема, то стоит посмотреть в сторону чего-нибудь вроде akka streams.

maxcom ★★★★★ ()
Ответ на: комментарий от maxcom

Прочем нет, streams не нужен — я забыл что бывают ограниченные блокирующиеся очереди, можно просто задать лимит.

maxcom ★★★★★ ()

Еще я придумал более эффективный вариант — надо сделать каждому потоку кроме первого по семафору, который инициализировать нулем. Все треды кроме первого ожидают пермит из семафора, обрабатывают следующий элемент и выдают пермит следующему. Первый никого не ждет, а только обрабатывает и выдает пермиты второму.

maxcom ★★★★★ ()
Ответ на: комментарий от maxcom

Про инициализацию int permit = 0 у семафора разобрался.

Это разве будет гарантировать, что второй поток не обгонит первый?

И еще вопрос по второму этапу — прерываем поток (например 3) по некоторому условию. Как быть с его семафором?

И еще. Знаю, что в java.util.concurrent есть Condition. Я с ним не работал. Может пример какой-нибудь покажешь? К этой задаче его можно применить?

pol01 ()
Ответ на: комментарий от pol01

Это разве будет гарантировать, что второй поток не обгонит первый?

Да, у тебя каждый поток будет выдавать следующему пермит на обработку следующего элемента.

По второму этапу я бы подумал как переформулировать задачу на более подходящих для этого абстракциях. Сложную логику на потоках лучше самому не программировать. Посмотрите на Akka например, не знаю что сейчас для Java популярно.

Condition тебе не нужен, это слишком низкоуровневый примитив.

maxcom ★★★★★ ()
Ответ на: комментарий от maxcom

Да в том то всё и дело, что нужно использовать java.concurrent без сторонних библиотек.

pol01 ()
Ответ на: комментарий от maxcom

Нет, что-то с семафорами тяжко доходит. Можешь псевдокод набросать, хотя-бы для трех потоков. А дальше сам попробую додумать.

Вот идею с очередями я понял: Организуем цепочку: T1 -> Q1 -> T2 -> Q2 -> T3 и т.д.

t1 обработает e1 и отдаст его в q1, где t2 его подхватит.

t2 обрабатывает e1, отдает его в q2, затем t3 и ниже по цепочке.

Здесь порядок работы тредов явно задан. Причем любой тред можно убить, а при передаче элемента проверять t2.isInterrupted() и если поток прекратил работу, то элемент можно пробрасывать явно ниже по цепи.

С семафором никак не пойму мысль.

PS: Ладно, я уже спать. Завтра буду делать очереди. Если поможешь с семафором решить, то сравним результат. За подсказки спасибо!

pol01 ()

самый простой и правильный способ таких вещей это у каждого потока своя очередь и кто хочет и как хочет в неё пихает объекты, после отработки объекта поток из очереди объект удаляет - делать 30 минут если не шарить. самый важный плюс - твой алгоритм будем меняться т.к., я думаю, ты делаешь хрень, а любые сложные вещи будет чрезмерно переусложняться в процессе переделки алгоритма.

vtVitus ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.