LINUX.ORG.RU

Многопоточность. Помогите понять что не так.

 ,


2

5

Добрый день. Нужно реализовать обработку контейнеров в нескольких потоках. Набросал на коленке простой тестовый пример, но он работает не так, как ожидалось:

#include <cmath>
#include <vector>
#include <thread>
#include <iostream>
#include <functional>


int test(int x)
{
    return x * x;
}


void print(const std::vector<int>& v)
{
    for (const auto i : v)
    {
        std::cout << i << " ";
    }
    std::cout << std::endl;
}


void print(std::vector<int>::iterator begin, std::vector<int>::iterator end)
{
    auto it = begin;
    while (it != end)
    {
        std::cout << *it << " ";
        ++it;
    }
    std::cout << std::endl;
}


std::vector<int>::iterator for_each(std::vector<int>::iterator first, std::vector<int>::iterator last,
                                    std::vector<int>::iterator result, const std::function<int(int)>& mapFunctor)
{
    while (first != last)
    {
        *result = mapFunctor(*first);
        ++result;
        ++first;
    }
    return result;
}


std::vector<int>::iterator mapped(std::vector<int>::iterator first, std::vector<int>::iterator last,
                                  std::vector<int>::iterator result, const std::function<int(int)>& mapFunctor)
{
    auto availableThreads = std::thread::hardware_concurrency();
    if (availableThreads < 2)
    {
        for_each(first, last, result, mapFunctor);
    }
    else
    {
        const auto totalSize = std::distance(first, last);
        const auto blockSize = static_cast<size_t>(std::ceil(1.0 * totalSize / availableThreads));
        std::vector<std::thread> threads;
        std::vector<std::vector<int>> results;
        auto current = first;
        for (unsigned int i = 0; i < (availableThreads - 1); ++i)
        {
            auto blockStart = current;
            auto blockEnd = blockStart;
            std::advance(blockEnd, blockSize);
            current = blockEnd;
            results.emplace_back(std::vector<int>(blockSize));
            threads.emplace_back(
                    std::thread(for_each, std::ref(blockStart),
                                std::ref(blockEnd), results.at(i).begin(), mapFunctor));
        }
        result = for_each(current, last, result, mapFunctor);
        for (unsigned int i = 0; i < threads.size(); ++i)
        {
            if (threads[i].joinable())
            {
                threads[i].join();
            }
            print(results.at(i));
            result = std::copy(results.at(i).begin(), results.at(i).end(), result);
        }
    }
    return result;
}


int main()
{
    std::vector<int> v;
    for (unsigned int i = 1; i < 41; ++i)
    {
        v.emplace_back(i);
    }
    std::vector<int> v2;
    v2.resize(v.size());
    print(v);
    mapped(v.begin(), v.end(), v2.begin(), test);
    print(v2);
    return 0;
}
В итоге я получаю примерно такой вывод:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
121 144 169 196 225
121 144 169 196 225
961 1024 1089 1156 1225
676 729 784 841 900
961 1024 1089 1156 1225
961 1024 1089 1156 1225
961 1024 1089 1156 1225
1296 1369 1444 1521 1600 121 144 169 196 225 121 144 169 196 225 961 1024 1089 1156 1225 676 729 784 841 900 961 1024 1089 1156 1225 961 1024 1089 1156 1225 961 1024 1089 1156 1225
Первая строка — исходный вектор, со второй по восьмую — то, что получается в потоках, в девятой — результирующий вектор. Собирал с gcc и clag. Результат примерно одинаковый. Ради интереса попробовал собрать и запустить в VS, и все отработало как и задумывалось. Что я не учел?

Вектор results у тебя битый. emplace_back может привести к переаллокации, так что те итераторы, что ты раньше передавал на него в std::thread, становятся невалидны.

Это как минимум, я особо пристально не смотрел

yoghurt ★★★★★ ()
Ответ на: комментарий от yoghurt

Это главное. Ещё тред для хвоста не создаётся, хотя судя по (availableThreads - 1) он планировался.
P.S. Кажется ЛОР не место для бесплатной помощи по домашним заданиям.

snizovtsev ★★★★ ()

std::ref

Эти переменные умирают.

results.emplace_back(std::vector<int>(blockSize));

Уже сказали, что надо выделить заранее, работать то будет, но это скорее случайность.

result = for_each(current, last, result, mapFunctor);

Обрабатывается последняя часть массива, а записывается в выходной результат первым.

xaizek ★★★★★ ()
Ответ на: комментарий от snizovtsev

Тред для хвоста и не нужен. Как раз таки тредов создается на один меньше, чем позволяет система. Последний чанк обрабатывается основным потоком.
P.S. А с чего вывод, что это домашнее задание?

baldman88 ()
Последнее исправление: baldman88 (всего исправлений: 1)
Ответ на: комментарий от xaizek

std::ref

Эти переменные умирают.

Большое спасибо!
Мне не важен порядок на выходе. Поэтому, как уже написал выше, последний чанк обрабатывается в основном потоке и пишется в начало результирующего массива.

baldman88 ()
Ответ на: комментарий от snizovtsev

Ещё тред для хвоста не создаётся, хотя судя по (availableThreads - 1) он планировался.

Хвост в главном треде продолжает процесситься. Хороший прием, кстати, так ещё TBB делает

yoghurt ★★★★★ ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.