LINUX.ORG.RU

Разработка сетевой библиотеки на C++20: интеграция асинхронности и алгоритма Raft (часть 1)

 , , , ,

Разработка сетевой библиотеки на C++20: интеграция асинхронности и алгоритма Raft (часть 1)

7

2

Введение

С годами работы в области распределённых систем, я понял, что мой опыт не будет полным без реализации алгоритма Raft. Это осознание побудило меня к действию: я решил создать свою реализацию, используя асинхронные возможности C++20.

Задача стояла не из лёгких: мне требовалось разработать сетевую библиотеку, обходясь без громоздких решений вроде Boost или gRPC, создать эффективную библиотеку сериализации сообщений без использования таких тяжёлых инструментов, как protobuf, и реализовать алгоритм Raft таким образом, чтобы он был независим от сетевой инфраструктуры и поддавался простому тестированию через юнит-тесты.

В этой статье я поделюсь своим опытом создания сетевой библиотеки на основе корутин C++20.

Реализация EchoServer и EchoClient

Первым шагом в создании сетевой библиотеки была реализация простого EchoServer и EchoClient. EchoClient соединяется с EchoServer и отправляет ему сообщения, на которые сервер отвечает тем же текстом. Взаимодействие клиента и сервера можно увидеть на примере:

$ ./echoclient
test
Received: test

message
Received: message

yet another message
Received: yet another message

$ ./echoserver
Received: test

Received: message

Received: yet another message

Код EchoClient

Целью было сделать код клиента максимально простым. Пример реализации:

TSimpleTask client(TLoop* loop)
{
    char out[128] = {0};
    char in[128] = {0};
    ssize_t size = 1;

    try {
        TSocket input{TAddress{}, 0 /* stdin */, loop->Poller()};
        TSocket socket{TAddress{"127.0.0.1", 8888}, loop->Poller()};

        co_await socket.Connect();
        while (size && (size = co_await input.ReadSome(out, sizeof(out)))) {
            co_await socket.WriteSome(out, size);
            size = co_await socket.ReadSome(in, sizeof(in));
            std::cout << "Received: " << std::string_view(in, size) << "\n";
        }
    } catch (const std::exception& ex) {
        std::cout << "Exception: " << ex.what() << "\n";
    }
    loop->Stop();
    co_return;
}

где TSimpleTask - тривиальная корутина.

Код EchoServer

Аналогично, код сервера был разработан для простоты и эффективности:

TSimpleTask client_handler(TSocket socket, TLoop* loop) {
    char buffer[128] = {0}; ssize_t size = 0;

    try {
        while ((size = co_await socket.ReadSome(buffer, sizeof(buffer))) > 0) {
            std::cerr << "Received: " << std::string_view(buffer, size) << "\n";
            co_await socket.WriteSome(buffer, size);
        }
    } catch (const std::exception& ex) {
        std::cerr << "Exception: " << ex.what() << "\n";
    }
    co_return;
}

TSimpleTask server(TLoop* loop)
{
    TSocket socket(TAddress{"0.0.0.0", 8888}, loop->Poller());
    socket.Bind();
    socket.Listen();

    while (true) {
        auto client = co_await socket.Accept();
        client_handler(std::move(client), loop);
    }
    co_return;
}

Реализация Awaitable в C++20

C++20, хотя и предлагает достаточно низкоуровневый API для реализации корутин, позволяет сократить сложность, предоставляя механизм Awaitable. Awaitable управляет приостановкой и возобновлением корутин, используя coroutine_handle, который ожидает активации на основе механизмов поллинга вроде select или poll.

Awaitable определяет три основных метода: await_ready, который проверяет, готова ли корутина к выполнению, await_suspend, который приостанавливает корутину и связывает её с механизмом поллинга, и await_resume, который возобновляет выполнение корутины после того, как ожидаемое событие произошло. Эта концепция облегчает управление асинхронными операциями, делая код более читаемым и эффективным.

В данном примере метод ReadSome сокета реализован с использованием механизма Awaitable в C++20:

 auto ReadSome(char* buf, size_t size) {
    struct TAwaitable  {
        bool await_ready() {
            Run();
            return ready = (ret >= 0);
        }

        int await_resume() {
            if (!ready) { Run(); }
            return ret;
        }

        void Run() {
            ret = read(fd, b, s);
            if (ret < 0 && !(err == EINTR||err==EAGAIN||err==EINPROGRESS)) {
                throw std::system_error(errno, std::generic_category(), "read");
            }
        }

        void await_suspend(std::coroutine_handle<> h) {
            poller->AddRead(fd, h);
        }

        TSelect* poller;
        int fd;
        char* b; size_t s;
        int ret;
        bool ready;
    };
    return TAwaitable{Poller_,Fd_,buf,size};
}

Структура TAwaitable определяет необходимые функции для управления асинхронным выполнением. Если чтение данных с сокета невозможно, корутина приостанавливается (await_suspend), и событие добавляется в поллер. Как только сокет готов к чтению, вызывается resume на coroutine_handle, что приводит к возобновлению корутины (await_resume) и продолжению её выполнения.

Для реализации более сложных вещей, например, чтобы с помощью ReadSome сделать Read, который читает точное число байт из сокета, нужно уметь вызывать одну корутину из другой корутины и получать результат:

template<typename T, typename TSocket>
struct TStructReader {
    TStructReader(TSocket& socket)
        : Socket(socket)
    { }

    TValueTask<T> Read() {
        T res;
        size_t size = sizeof(T);
        char* p = reinterpret_cast<char*>(&res);
        while (size != 0) {
            auto readSize = co_await Socket.ReadSome(p, size);
            if (readSize == 0) {
                throw std::runtime_error("Connection closed");
            }
            if (readSize < 0) {
                continue; // retry
            }
            p += readSize;
            size -= readSize;
        }
        co_return res;
    }

private:
    TSocket& Socket;
};

Эта структура будет использоваться так:

auto result = co_await TStructReader<TType>(socket).Read();

Структура TStructReader использует метод ReadSome для асинхронного чтения, обрабатывая ситуации закрытого соединения и необходимость повтора чтения. Чтобы достичь этого, корутина, возвращаемая TStructReader::Read, должна быть одновременно Awaitable. Это обеспечивает возможность приостановки вызывающей корутины и её возобновления после получения результата. Для обеспечения данного поведения в await_suspend мы будем прикапывать корутину, которая нас вызывает, в final_suspend вызываемой корутины мы будем пробуждать вызывающую корутину и получать результат. Полный код подобной корутины (TValueTask<T>) можно найти на GitHub: coroio/corochain.hpp.

Расширенные Возможности Сетевой Библиотеки

С помощью механизма вызова корутин по цепочке, я расширил сетевую библиотеку, реализовав чтение и запись структур данных, построчное чтение, а также поддержку SSL сокетов с использованием цепочек вызовов корутин. Пример echoclient с построчным чтением демонстрирует эффективность и гибкость подхода:

    TFileHandle input{0, poller}; // stdin
    TSocket socket{std::move(addr), poller};
    TLineReader lineReader(input, maxLineSize);
    TByteWriter byteWriter(socket);
    TByteReader byteReader(socket);

    co_await socket.Connect();
    while (auto line = co_await lineReader.Read()) {
        co_await byteWriter.Write(line);
        co_await byteReader.Read(in.data(), line.Size());
        std::cout << "Received: " << std::string_view(in.data(), line.Size()) << "\n";
    }

Заключение

В заключение, архитектура разработанной мной библиотеки представлена на стартовой картинке статьи. Библиотека поддерживает множество механизмов полинга, включая select, poll, epoll, kqueue и uring. Хотя я не описываю протокол Raft в деталях (его можно найти в raft.pdf), я точно следовал его спецификации. Для сериализации сообщений использовалась техника чтения/записи запакованных структур в формате TLV. Пример сессии трёх серверов Raft и клиента демонстрирует функциональность:

$ ./server --id 1 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Leader, Term: 3, Index: 0, CommitIndex: 0, Delay: 2:0 3:0 MatchIndex: 2:0 3:0 NextIndex: 2:1 3:1
Leader, Term: 3, Index: 0, CommitIndex: 0, Delay: 2:0 3:0 MatchIndex: 2:0 3:0 NextIndex: 2:1 3:1
Leader, Term: 3, Index: 0, CommitIndex: 0, Delay: 2:0 3:0 MatchIndex: 2:0 3:0 NextIndex: 2:1 3:1
...
Leader, Term: 3, Index: 1080175, CommitIndex: 1080175, Delay: 2:0 3:0 MatchIndex: 2:1080175 3:1080175 NextIndex: 2:1080176 3:1080176
....

$ ./server --id 2 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Follower, Term: 3, Index: 0, CommitIndex: 0,
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...

$ ./server --id 3 --node 127.0.0.1:8001:1 --node 127.0.0.1:8002:2 --node 127.0.0.1:8003:3
...
Candidate, Term: 2, Index: 0, CommitIndex: 0,
Follower, Term: 3, Index: 0, CommitIndex: 0,
...
Follower, Term: 3, Index: 1080175, CommitIndex: 1080175,
...

$ dd if=/dev/urandom | base64 | pv -l | ./client --node 127.0.0.1:8001:1 >log1
 198k 0:00:03 [159.2k/s] [        <=>

Исходный код сетевой библиотеки доступен на GitHub: coroio, где также можно ознакомиться с сравнительными графиками производительности по сравнению с libevent. Код библиотеки Raft также доступен: miniraft-cpp.

★★★★★

Проверено: hobbit ()

Так получилось, что про реализацию самого Raft, и как она завязана со всем остальным, в статье почти ничего нет…

hobbit ★★★★★
()
Последнее исправление: hobbit (всего исправлений: 1)
Ответ на: комментарий от hobbit

Да, на него только ссылка в конце есть и пример запуска :) Надо вторую часть писать, и так много текста получилось.

Reset ★★★★★
() автор топика

Вторая часть будет скорее всего. Суперподроностей пистать не буду скорее всего. Для Raft нужна диаграмма перехода между стейтами. Его API (это 1 функция у меня :) ) и как связан Raft с сетевой библиотекой. Я на написание Raft потратил 4 дня по 2 часа работы. А вот на сетевую библиотеку у меня ушло очень много времени.

Reset ★★★★★
() автор топика
Ответ на: комментарий от dataman

А почему бы не https://github.com/boost-ext/ut?

cmocka я использую еще в своих pure-C проектах, cmocka легко ставится на любой платформе стандартным менеджером пакетов (в частности в ubuntu, homebrew, msys).

О boost/ut не слышал, но слово boost в названии меня уже напрягает, можно это считать религиозным предрассудком :)

Reset ★★★★★
() автор топика
Ответ на: комментарий от dataman

Ok. Я тогда гляну. Если оно без зависимостей, то можно притащить сабмодулем и даже лучше будет чем установка cmocka.

Reset ★★★★★
() автор топика

Целью было сделать код клиента максимально простым. Пример реализации:
TSimpleTask

Так а где этот TSimpleTask хотя бы на диаграмме? Что это за класс? Откуда читателю знать что это за на?

Xintrea ★★★★★
()
Ответ на: комментарий от Xintrea

Откуда читателю знать что это за на?

Он сходит на гитхаб и посмотрит. Это тривиальная корутина такого вида:

struct TSimpleTask : std::coroutine_handle<TPromise>
{
    using promise_type = ::TPromise;
};
 
struct TPromise
{
    TSimpleTask get_return_object() { return {TSimpleTask::from_promise(*this)}; }
    std::suspend_always initial_suspend() noexcept { return {}; }
    std::suspend_always final_suspend() noexcept { return {}; }
    void return_void() {}
    void unhandled_exception() {}
};

На диаграмме я указал только ключевые вещи.

Reset ★★★★★
() автор топика
Последнее исправление: Reset (всего исправлений: 1)
Ответ на: комментарий от Xintrea

Добавил ссылку на код. Ссылка на TValueTask<T> в статье уже была. Я написал почти текст второй части. Где расказано как именно делался Raft и связывается с сетевой библиотекой. Да, Raft можно и без сети сделать, но с сетью это более практическое решение.

Reset ★★★★★
() автор топика
Последнее исправление: Reset (всего исправлений: 2)
        auto client = co_await socket.Accept();

Почему тип auto? TSocket же… вообще откуда эта мода всовывать auto куда попало? Какова правильная идеология использования auto? Я просто недавно в примерах QT похожее видел…

aiqu6Ait ★★★★
()
Ответ на: комментарий от aiqu6Ait

Почему тип auto?

Потому что мне так нравится.

TSocket же…

Верно. На самом деле зависит от стиля, принятого в проекте. На одном из моих проектов слово auto было запрещено.

Reset ★★★★★
() автор топика
13 января 2024 г.

используя асинхронные возможности C++20.

Зачем? Какая-то мешанина из шаблонов и корутин. Чем старый-добрый callback не угодил?

while ((size = co_await socket.ReadSome(buffer, sizeof(buffer))) > 0) {

Вы действительно думаете, что это хорошо читается, что это образец для подражания, что это надо в статью помещать?

Вы понимаете, что если вы это делаете шаблонами, то код скопипастится столько раз, сколько разных типов вы будете использовать? Будет 200 структур, которые надо передавать, будет 200 копипаст всего ассинхронного движка, будет 500 структур, будет 500 копипаст. А теперь главный вопрос: зачем, когда можно только void* и размер получить?

Даже у ужасного Boost.ASIO нет такого кошмара.

zx_gamer ★★
()
Ответ на: комментарий от zx_gamer

Чем старый-добрый callback не угодил?

Тем что нечитабельно и неподдерживаемо.

Вы действительно думаете, что это хорошо читается, что это образец для подражания, что это надо в статью помещать?

Да.

Вы понимаете, что если вы это делаете шаблонами, то код скопипастится столько раз, сколько разных типов вы будете использовать? Будет 200

Я это понимаю. Но 200 не будет, там шаблонным параметром идет тип поллера, а их 5 вариантов. Если забить на io_uring, то можно воспользоваться TPollerBase и обойтись без шаблонов.

Reset ★★★★★
() автор топика
Ответ на: комментарий от Reset

Тем что нечитабельно и неподдерживаемо.

try_read(fd, buffer, geted_data_handler_cb);

Вы уверены?

А ещё нормальные текстовые редакторы умеют «прыгать» на определение функции.

Да.

Пока результат вызова неблокирующей асинхронной функции socket.ReadSome(buffer, sizeof(buffer)) возвращает некий размер, который записывается в переменную size, больший нуля…

Вы уверены?

Попробуйте do…while, код может стать сильно опрятнее.

И конечно, это лучше вынести в отдельную функцию. «Читать до конца» может пригодиться много где.

zx_gamer ★★
()
Ответ на: комментарий от zx_gamer

try_read(fd, buffer, geted_data_handler_cb); Вы уверены?

Абсолютно. Лапша получается из колбэков. Сейчас конечно придумали паттернов типа акторной модели и фьючей, но по сути это те же яйца вид в профиль. За 20 лет в разработке я такого кода уже наелся, спасибо, больше не хочу.

Пока результат вызова …

И?

Попробуйте do…while, код может стать сильно опрятнее.

Нет. do..while это антипаттерн.

И конечно, это лучше вынести в отдельную функцию.

Отдельные helper-функции есть. Есть TByteReader который читает сколько нужно и при необходимости поретраит. Есть TLineReader, который построчно читает и в конце возращает пустую строку.

Reset ★★★★★
() автор топика
Ответ на: комментарий от Reset

Нет. do..while это антипаттерн.

Чего? Это совершенно естественный подход к программированию. Зачем вместо маленького условия в конце писать сложное выражение с побочным эффектом в начале?

zx_gamer ★★
()
Ответ на: комментарий от zx_gamer

Так не получится, потому что результат надо проверить сразу. Лучше тогда два раза позвать ReadSome - одни раз вне цикла и второй раз внутри цикла, чтобы сложное условие не писать.

Reset ★★★★★
() автор топика

щас есть штука проще и моднее Raft: Viewstamped Replicated

lesopilorama
()
Ответ на: комментарий от aiqu6Ait

auto - это чтобы явно не писать тип руками. Идеология правильная, для того и придумано.

lesopilorama
()
Для того чтобы оставить комментарий войдите или зарегистрируйтесь.