LINUX.ORG.RU

c++: многопоточное чтение большого файла блоками

 , , ,


1

3

Всем привет! Хочу поинтересоваться у знатоков, как читать большой файл несколькими потоками поблочно. Всегда использовал потоки несколько тривиально: закидывал какой-то класс(qt) или функцию(std::thread) в отдельный поток, где-то дожидался ответа или писал в очередь (std::queue), и, собственно, никаких влажных фантазий на тему ускорения вычислений, таки, не совершал. Но тут появилась идея обрабатывать на лету большие куски файла, подумал, вот ведь она, родимая, многопоточность то. Ан нет, 4 дня сижу - херня получается.

Идея такова: разбиваю файл на куски - будь то размер гранулярности страниц в отображении файла или просто буффер; потом несколькими потоками (2-3 не важно), читаю кусками файл. Затык в том, что не понимаю, за что цепляться: пробовал отслеживать позицию в файле(перекидывать ее между потоками), такая же идея с отсчетом размера файла. Может, есть на эту тема статья хорошая или книженция? В интернете все мучают conditional_variable и примеры на двух функциях. Кто-то пишет, что надо просто открыть в каждом потоке этот файл разными дескрипторами и все будет работать. Может, это работает только с сишным подходом открытия файла open(), но ifstream отреагировал отрицательно. Задайте вектор по этой теме, если не трудно


Это ж только замедлит работу, читай в память, и потоками обрабатывай прочтенные куски.

insw ()
Ответ на: комментарий от insw

а нельзя читать блоками и обрабатывать? читал, одни говорят, типа, жесткий диск будет крутить башкой, и будет тормоз. другие говорят, ничего страшного, если только читать, у диска, мол, свои способы с этим бороться.

В общем, я сделал вариант с отображением, с сишным чтением, с срр потоками чтения, и если в отдельном потоке будет только обработка - по-моему, это как-то уныло для хваленой многопоточности

Tumyq ()
Последнее исправление: Tumyq (всего исправлений: 1 )

Сделай mmap() файла (если используешь Qt — QFile умеет из коробки в удобном виде), получишь доступ ко всему содержимому просто по адресу. Потом делишь файл на равные блоки и передаёшь обработчикам указатель на данные и размер блока, который они должны обработать. Ну или я не понял вопрос

XMs ★★★★★ ()

пробовал отслеживать позицию в файле(перекидывать ее между потоками)

Зачем? Расшарь интерфейс к файлу, прикрой мутексом, и используй из всех тредов. Один тред читает - другие ждут, закончил читать блок, кто-то проснулся и читает дальше.

Serral ()
Ответ на: комментарий от Tumyq

Непонятно, зачем тебе это нужно? Диск-то у тебя один и он команды отрабатывает последовательно. Да и производительность у него такая, что один поток бы загрузить — уже счастье. Мне кажется, что тебе правильно советуют читать одним потоком, а уже обрабатывать в несколько.

no-such-file ★★★★★ ()
Ответ на: комментарий от XMs

Тут qt не удобно, он не всегда будет. А так - да, согласен, там у него под капотом полно хорошего. Файл я делю, передаю, позиции, но че-т криво, видимо.

Tumyq ()
Ответ на: комментарий от Serral

Он в ifstream по умолчанию расширенный. Так и будет, похоже - читать по-очереди. Вопрос асинхронного чтения, видимо, отпадает.

Tumyq ()
Ответ на: комментарий от Tumyq

Он в ifstream по умолчанию расширенный

Мутекс надо, что не было data race.

Вопрос асинхронного чтения, видимо, отпадает.

Ну это изначально была неудачная идея. Параллелить надо то, что не блокирует друг друга. А чтения из разных тредов только будут насиловать диск и заставлять друг друга ждать.

Serral ()
Ответ на: комментарий от Tumyq

лучше

Для кого лучше? Thruput точно не будет лучше, но возможно будут меньше задержки поступления данных в треды-обработчики (но поступать они при этом будут медленнее).

no-such-file ★★★★★ ()
Ответ на: комментарий от Tumyq

Зависит от того, что ты с данными делаешь. Например, gzip часто работает со скоростью диска и параллелить его смысла нет (хотя и возможно, см pigz), а bzip2 намного более трудоёмок и прекрасно параллелится, см pbzip2.

В любом случае одновременное чтение несколько потоков работает тем хуже, чем меньше размеры порций. Тот же pbzip2 читает читает блоками немаленького размера.

legolegs ★★★★★ ()

Если хочешь многопоточное чтение то начни с приобретения железа которое это умеет. На нем так и пишут «Поддерживает 16 потоков чтения», или что-то в этом духе. Если денег хватит. Иначе это напрасная трата времени.

И, да, std::thread не имеет к многопоточному чтению никакого отношения. Тоесть ты конечно можешь отобразить потоки чтения на потоки исполнения но единственное что ты получишь - безсмысленное переусложнение кода.

cvv ★★★★★ ()
Ответ на: комментарий от XMs

А какой смысл ммапить? Данные в виртуальную память все равно будут считаться с диска по мере запросов, т.е. в итоге получится все то же самое, но с другим интерфейсом.

ТС, пока твой диск работает медленнее CPU, бить на потоки нет вообще никакого смысла.

filosofia ()
Ответ на: комментарий от Tumyq

Это 100%, что диск лучше многопоточно не использовать?

Это зависит от задачи. Если обработка считанных данных занимает времени сравнимо со временем чтения данных, то распараллеливание чтения данных не принесёт ускорения (не важно как оно реализовано через несколько потоков, процессов, машин), скорее будет замедление.

andalevor ★★ ()
Ответ на: комментарий от legolegs

прекрасно параллелится

Это опять же про обработку, а её про чтение, я так понимаю?

Tumyq ()
Последнее исправление: Tumyq (всего исправлений: 1 )
Ответ на: комментарий от filosofia

Мапить, чтобы буфер не использовать: взял указатель, и пошёл обрабатывать, пока страница не закончилась.

Tumyq ()
Ответ на: комментарий от filosofia

Именно ради интерфейса. Мне кажется, так меньше вероятность наворотить лишнего

XMs ★★★★★ ()

Вариант с mmap хорош.

Ещё вариант, если длительность обработки данных больше чтения. Один поток читает файл кусками и пишет в очередь, прикрытую мьютексом. Когда размер очереди достигает некой величины, поток засыпает на conditional_variable.

Произвольное количество потоков-воркеров берут из очереди данные на обработку. Тут в очереди нужно хранить указатель на данные, а не сами данные, чтобы можно было быстро взять мьютекс, забрать данные и отпустить мьютекс без копирования. Если в очереди нет данных или размер очереди меньше некой величины (чтобы в ноль не опусташать), то поток дёргает cond_var.

Короче классика.

ox55ff ★★★★★ ()
Ответ на: комментарий от ox55ff

Надеюсь, что я тоже, так сказать, к этой классике прильну. Похоже, картинка складывается для меня, спасибо! Переспрошу на всякий случай, я правильно понял, блоки считанных данных сохраняются в оперативку, следить надо за ее переполняемостью?

Tumyq ()
Ответ на: комментарий от Tumyq

блоки считанных данных сохраняются в оперативку, следить надо за ее переполняемостью?

Да, но не весь файл, а некоторое количество. т.е. поток читает 10 (например) блоков и засыпает. Другие потоки берут из очереди блоки на обработку и когда в очереди остаётся меньше 5 (например), то первый поток просыпается и снова наполняет очередь до 10 элементов. До нуля не надо опустошать очередь, чтобы всегда был запас и поток всегда мог без ожидания взять данные на обработку.

Но тут нужно всё хорошо рассчитать. Обрабатывающие потоки нужно кормить не маленькими кусочками, а большими. Чтобы они часто не ходили в очередь и не блокировались на мьютексе. Если обработка слишком простая, то не будет выигрыша от многопоточной схемы. Только лишняя сложность и потеря времени на возню с синхронизацией.

Ещё нужно уметь правильно завершать работу потоков. Т.к. поток может спать на conditional_variable. Т.е. перед тем как поток завершить, нужно его разбудить иначе словишь deadlock.

ox55ff ★★★★★ ()
Ответ на: комментарий от Tumyq

Это 100%, что диск лучше многопоточно не использовать? Ибо я видел на форумах баталии по этому поводу

Какой-нибудь NVMe SSD вполне можно.

Meyer ★★★★★ ()
Ответ на: комментарий от ox55ff

Да, но не весь файл, а некоторое количество. т.е. поток читает 10 (например) блоков и засыпает. Другие потоки берут из очереди блоки на обработку и когда в очереди остаётся меньше 5 (например), то первый поток просыпается и снова наполняет очередь до 10 элементов.

KISS.

#include <atomic>
#include <fstream>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;

int main() {
    ifstream f { "/usr/bin/[" };
    mutex m;
    atomic<long> count { 0 };

    auto reader = [&]
    {
        while(1) {
            char c; // Буфер

            {
                // Начинаем чтение, другие треды ждут
                lock_guard _ { m };

                // Читаем в буфер, если файл закончился - выходим, или выставляем флаг, чтоб выйти после обработки
                if( !f.get( c ) )
                    break;
            }

            // Обрабатываем данные
            count++;
            this_thread::sleep_for( chrono::nanoseconds( 1 ) );
        }
    };

    vector<thread> workers;
    for( int i = 0 ; i < 4 ; ++i )
        workers.push_back( thread( reader ) );

    for( auto& t : workers )
        t.join();    

    cout << count << endl;   
}
Serral ()
Ответ на: комментарий от ox55ff

Хорошо, про взаимодействие потоков я понял. Попробую сделать. Спасибо

Tumyq ()
Последнее исправление: Tumyq (всего исправлений: 1 )
Ответ на: комментарий от Serral

f.get( c ) как раз получает указатель из очереди данных и обрабатывает, так?

Tumyq ()
Последнее исправление: Tumyq (всего исправлений: 2 )
Ответ на: комментарий от XMs

Достаточно прозрачное решение проблемы, как по мне.

Nibbler ()
Ответ на: комментарий от Serral

Чтение файла из под мьютекса.

Да, если чтение быстрое. Иначе другие потоки будут ждать, пока один читает.

ox55ff ★★★★★ ()
Ответ на: комментарий от ox55ff

Да, если чтение быстрое. Иначе другие потоки будут ждать, пока один читает.

Если чтение медленное относительно обработки, то параллелить смысла нет, треды будут стоять и ждать пока им что-то вычитают. Максимум тут можно выжать сделав два треда, без всяких пулов.

Serral ()
Ответ на: комментарий от ox55ff

Это схема хороша, когда файл супер огромный и нет возможности его полностью загрузить в оперативную память.

Nibbler ()
Ответ на: комментарий от Serral

похоже, я что-то намудрил.


#include "thread_pool/ctpl.h"

struct HFREntry_par {   
    std::string fname_in;
    std::string fname_out;
    unsigned int block_size;
};

    static fs::ifstream *f_in;
    static fs::ofstream *f_out;
    static ctpl::thread_pool *t_pool; 
    static std::mutex mtx, mtx2, mtx3;   

    static size_t f_size = 0, block_size = 0;   

void MT_HFReaderImpl::init_f(const HFREntry_par &vars)
{
    std::string path = vars.fname_in;
    if (!fs::exists(path) || !fs::is_regular_file(path)) return;
    block_size = vars.block_size;
    f_in = new fs::ifstream(path, std::ios::binary | std::ios::ate); //файл на чтение
    f_size = static_cast<size_t>(f_in->tellg()) + 1; //размер файла
    f_in->seekg(0);
    f_out = new fs::ofstream(vars.fname_out, std::ios::binary); //файл на запись
    int core_n = static_cast<int>(std::thread::hardware_concurrency()); //заряжаем пул
    t_pool = new ctpl::thread_pool(core_n);
}

void MT_HFReaderImpl::loop_f()
{
     //сколько целых блоков и остаток
    if ( f_size <= 0 || block_size <= 0 ) return;

    auto reader = [&]
    {
        if (f_in == nullptr || f_out == nullptr || t_pool == nullptr) return;

        std::string buffer(unsigned(block_size), '\0'); // Буфер
        std::hash<std::string> hash_fn;

        {
            // Начинаем чтение, другие треды ждут
            std::unique_lock<std::mutex> lock(mtx);

            // Читаем в буфер
            if (f_size < block_size){
                block_size = f_size;
            }

            f_in->read(&buffer[0],int(block_size));

            f_size -= block_size;
            if(f_size <= 0){
                finish();
            }
        }

        // Обрабатываем данные
        size_t _hash = hash_fn(buffer);

        {
            // Пишем в выходной файл, другие треды ждут
            std::unique_lock<std::mutex> lock(mtx2);

            *f_out << std::to_string(_hash) + "\n";

            std::cout << "hash:" << _hash << '\n';
        }
    };

    while (!f_in->eof()){
        t_pool->push([&] (int id) { reader(); });
    }
}

void MT_HFReaderImpl::finish()
{
    std::unique_lock<std::mutex> lock(mtx3);
    f_in->close();
    delete f_in;
    f_in = nullptr;
    f_out->close();
    delete f_out;
    f_out = nullptr;
    delete t_pool;
    t_pool = nullptr;
}

void MT_HFReaderImpl::file_read( const HFREntry_par & vars )
{
    init_f(vars);
    loop_f();
}

int main(int argc, char *argv[])
{
    HFREntry_par vars;

    vars.fname_in = "input_file_name.txt";
    vars.fname_out = "output_file_name.txt";
    vars.block_size = 1024 * 1024;

    file_read( vars );
}


Tumyq ()
Последнее исправление: Tumyq (всего исправлений: 6 )
Ответ на: комментарий от Tumyq

следить надо за ее переполняемостью?

Слежение за переполняемостью оперативки это признак очень высокой квалификации. Обычные люди эту задачу доверяют операционной системе.

cvv ★★★★★ ()
Ответ на: комментарий от Tumyq

Для начала стоит разобраться с самыми основами. Пока ты просто копипастишь куски кода, не понимая, что они делают, ничего хорошего не получится.

Serral ()
Ответ на: комментарий от Tumyq

Когда чтение случайных блоков из устройства работает быстрее, чем последовательное (если такое возможно в принципе).

andalevor ★★ ()
Ответ на: комментарий от Tumyq

90%. Если твой конкретный диск умеет многопоток то можно использовать столько потоков, сколько умеет твой диск. Например на некоторых дисках стоит второй блок головок. В этом случае два потока с таким диском могут дать удвоение скорости чтения.

Плюс серверные NVMe должны уметь продвинутый многопоток.

cvv ★★★★★ ()
Ответ на: комментарий от Serral

Все дошло, что я натворил. Там же таски, а я… Отработало как надо, спасибо

«Что же мы наделали, Карл» :P

Tumyq ()
Последнее исправление: Tumyq (всего исправлений: 1 )
Ответ на: комментарий от Einstok_Fair

У меня и то, и то есть. .возможна ситуация, что hdd заменят на ssd

Tumyq ()

похоже, тут вопрос упирается в способность HDD/SSD в многопоточность - как вот это проверить - вот в чём вопрос.

anonymous ()
Ответ на: комментарий от anonymous

как вот это проверить - вот в чём вопрос.

  1. Почитать инструкцию

  2. Погуглить обзоры и тесты. Об этом пишут почти всегда

cvv ★★★★★ ()
Последнее исправление: cvv (всего исправлений: 1 )

если не ошибаюсь, io_uring в ядре запилили именно для эффективного асинхронного чтения с быстрых SSD. Я бы копал в эту сторону (при наличии соответствующего SSD, конечно)

anonymous ()

Я бы сделал один поток на последовательное чтение с диска и hardware_concurrency() воркеров на обработку. Читающий поток выделяет в два раза больше памяти чем количество_воркеров * размер_куска, по мере чтения отдает свободным воркерам кусок на обработку и свапается памятью с ним.

Kuzy ★★★ ()
Ответ на: комментарий от Kuzy

Да, так и буду делать. Очередь указателей на данные - лучшая идея

Tumyq ()
Ответ на: комментарий от anonymous

конечно, а потом надо будет сделать что-то другое, и будет «теперь снова пиши на с++». по моему скромному мнению с++ и Java - это надолго

Tumyq ()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.