LINUX.ORG.RU

Разработка сетевой библиотеки на C++20: интеграция асинхронности и алгоритма Raft (часть 2)

 , , ,

Разработка сетевой библиотеки на C++20: интеграция асинхронности и алгоритма Raft (часть 2)

7

2

Эта статья является продолжением предыдущей публикации, в которой описывается разработка сетевой библиотеки на C++20. В данном продолжении акцент сделан на более детальном описании разработки алгоритма Raft и его интеграции с сетевой библиотекой.

Алгоритм Raft представляет собой ключевой компонент в архитектуре распределенных систем. Он применяется для обеспечения согласованности и надежности в условиях распределенных вычислений, позволяя системам эффективно управлять состоянием и принимать решения в условиях потенциальных сбоев и изменений в составе кластера.

Этот алгоритм особенно ценен для систем, где требуется высокая доступность и надежность, таких как распределенные базы данных, системы управления конфигурациями и многие другие области, где требуется координация действий между множеством узлов.

Основные понятия алгоритма Raft

В Raft, каждый участник имеет одну из трех ролей: Leader, Follower или Candidate. Участники обмениваются сообщениями и меняют свои роли в соответствии с определенными правилами. Например, все участники начинают как Followers, становятся Candidates, если не получают сообщения от Leader, и проводят голосование. Чтобы стать Leader, участник должен получить большинство голосов. Диаграмма состояний и переходов между ними доступна на стартовой картинке к статье.

В алгоритме Raft, каждый участник обладает состоянием, которое делится на два вида: постоянное (persistent) и изменяемое (volatile). Постоянное состояние включает в себя несколько ключевых полей:

  • CurrentTerm: это число, которое увеличивается при каждом новом выборе лидера.
  • VotedFor: участник хранит информацию о том, за кого он голосовал в последний раз.
  • Log: это распределенный лог, который позволяет участнику добавлять к своему состоянию данные для реализации сложных сущностей на основе алгоритма Raft, таких как распределенные базы данных.

На C++:

struct TState {
    uint64_t CurrentTerm = 1; 
    uint32_t VotedFor = 0;
    std::vector<TMessageHolder<TLogEntry>> Log; 
};

Изменяемое состояние (volatile) каждого участника включает важные поля:

  • CommitIndex: Индекс последней надёжно сохранённой записи в распределённом логе, подтверждённой большинством участников.
  • LastApplied: Индекс последней записи, применённой к состоянию машины, полезен для реализации сложных систем, таких как базы данных.
  • NextIndex и MatchIndex: Словари для отслеживания индексов логов, которые необходимо отправить на серверы (NextIndex) и индексы подтверждённых записей (MatchIndex).
  • Votes: Множество голосов, собранных от узлов.
  • HeartbeatDue и RpcDue: Словари для управления временем отправки сообщений и таймаутов RPC.
  • ElectionDue: Время начала нового голосования, используется на Follower’ах для перехода в состояние Candidate и начала голосования.

На языке C++:

using TTime = std::chrono::time_point<std::chrono::steady_clock>;

struct TVolatileState {
    uint64_t CommitIndex = 0; 
    uint64_t LastApplied = 0; 
    std::unordered_map<uint32_t, uint64_t> NextIndex;
    std::unordered_map<uint32_t, uint64_t> MatchIndex; 
    std::unordered_set<uint32_t> Votes; 
    std::unordered_map<uint32_t, TTime> HeartbeatDue; 
    std::unordered_map<uint32_t, TTime> RpcDue; 
    TTime ElectionDue; 
};

В алгоритме Raft используются два основных типа сообщений, каждый из которых имеет свой ответ:

  1. TAppendEntriesRequest: Используется лидером как для отправки Heartbeat-сообщений, так и для передачи новых записей Follower’ам. В ответ на это сообщение Follower отправляет TAppendEntriesResponse.

  2. TRequestVoteRequest: Используется кандидатами для запроса голосования. Сообщение содержит информацию, необходимую другим узлам для принятия решения о том, стоит ли отдать голос за отправителя сообщения. В ответ на этот запрос другие узлы отправляют TRequestVoteResponse.

Дополнительно к основным сообщениям, Leader может получать от клиентов (не участников кластера) специальные сообщения типа TCommandRequest. Эти сообщения используются для добавления новых записей в распределенный лог. Это позволяет клиентам вносить изменения в состояние системы, которые затем реплицируются на остальные узлы кластера. Ответ на TCommandRequest генерируется и отправляется только после того, как сообщение было надежно сохранено на большинстве участников (majority) кластера Raft.

На рисунке в начале статьи наглядно изображены типы сообщений, которые могут передаваться между участниками алгоритма Raft. Правила обработки сообщений описаны в raft.pdf в таблице на странице 4, я не буду подробно на них останавливаться.

API Библиотеки

В API библиотеки для алгоритма Raft представлены два ключевых компонента:

  1. INode: Этот интерфейс определяет методы Send (для отправки сообщений участникам, фактически добавляя их в буфер) и Drain (для использования и возможной отправки буферизованных сообщений):
struct INode {
    virtual ~INode() = default;
    virtual void Send(TMessageHolder<TMessage> message) = 0;
    virtual void Drain() = 0; 
};
  1. TRaft: Этот класс является основой, хранящей текущее состояние, стейты и ссылки на INode для взаимодействия с другими участниками. Он обрабатывает входящие сообщения и управляет таймаутами. Класс TRaft требует от пользователя библиотеки постоянной подачи новых сообщений и регулярного вызова ProcessTimeout для обработки таймаутов и переходов между состояниями:
class TRaft {
public:
    TRaft(uint32_t node, const std::unordered_map<uint32_t, std::shared_ptr<INode>>& nodes);
    void Process(TTime now, TMessageHolder<TMessage> message, const std::shared_ptr<INode>& replyTo = {});
    void ProcessTimeout(TTime now);
};

Метод Process обрабатывает каждое сообщение в соответствии с текущим состоянием узла и правилами алгоритма Raft.

Метод ProcessTimeout занимается обработкой таймаутов. В случае истечения таймаута Follower переходит в состояние Candidate и инициирует процесс голосования для выбора нового лидера. С другой стороны, Leader использует таймауты для регулярной отправки Heartbeat сообщений, подтверждая своё лидерство.

Структура сообщений

В моей архитектуре Raft все сообщения производны от базового типа TMessage, который содержит поля для типа (Type), длины (Len) и собственно данных (Value):

enum class EMessageType : uint32_t {
    NONE = 0,
    LOG_ENTRY = 1,
    REQUEST_VOTE_REQUEST = 2,
    REQUEST_VOTE_RESPONSE = 3,
    APPEND_ENTRIES_REQUEST = 4,
    APPEND_ENTRIES_RESPONSE = 5,
    COMMAND_REQUEST = 6,
    COMMAND_RESPONSE = 7,
};

struct TMessage {
    uint32_t Type;
    uint32_t Len;
    char Value[0];
};

Все сообщения, которые передаются между узлами, наследуются от TMessageEx, дополнительно включая поля для идентификации отправителя (Src), получателя (Dst) и текущего терма (Term):

struct TMessageEx: public TMessage {
    uint32_t Src = 0;
    uint32_t Dst = 0;
    uint64_t Term = 0;
};

Для примера рассмотрим сообщение TAppendEntriesRequest, которое в алгоритме Raft используется лидером для отправки запросов на добавление записей в лог. Оно включает в себя поля PrevLogIndex, PrevLogTerm, LeaderCommit, LeaderId и Nentries. Поля PrevLogIndex и PrevLogTerm используются для того, чтобы Follower могли верифицировать и принимать набор сообщений:

struct TAppendEntriesRequest: public TMessageEx {
    static constexpr EMessageType MessageType = EMessageType::APPEND_ENTRIES_REQUEST;
    uint64_t PrevLogIndex = 0;
    uint64_t PrevLogTerm = 0;
    uint64_t LeaderCommit = 0;
    uint32_t LeaderId = 0;
    uint32_t Nentries = 0;
};

В случае наличия записей, за сообщением следует payload, состоящий из Nentries сообщений типа TLogEntry, каждое из которых содержит терм и данные:

struct TLogEntry: public TMessage {
    static constexpr EMessageType MessageType = EMessageType::LOG_ENTRY;
    uint64_t Term = 1;
    char Data[0];
};

Остальные используемые сообщения:

struct TAppendEntriesResponse: public TMessageEx {
    static constexpr EMessageType MessageType = EMessageType::APPEND_ENTRIES_RESPONSE;
    uint64_t MatchIndex;
    uint32_t Success;
};

struct TRequestVoteRequest: public TMessageEx {
    static constexpr EMessageType MessageType = EMessageType::REQUEST_VOTE_REQUEST;
    uint64_t LastLogIndex;
    uint64_t LastLogTerm;
    uint32_t CandidateId;
};

struct TRequestVoteResponse: public TMessageEx {
    static constexpr EMessageType MessageType = EMessageType::REQUEST_VOTE_RESPONSE;
    uint32_t VoteGranted;
};

struct TCommandRequest: public TMessage {
    static constexpr EMessageType MessageType = EMessageType::COMMAND_REQUEST;
    char Data[0];
};

struct TCommandResponse: public TMessage {
    static constexpr EMessageType MessageType = EMessageType::COMMAND_RESPONSE;
    uint64_t Index;
};

Для удобства преобразования между базовым сообщением и конкретными типами сообщений используется структура-обертка TMessageHolder. Эта обертка содержит указатель на сообщение (Mes), сырые данные (RawData), размер payload (PayloadSize) и payload (Payload). Методы Cast и Maybe обеспечивают удобное и безопасное преобразование типов сообщений:

template<typename T>
requires std::derived_from<T, TMessage>
struct TMessageHolder {
    T* Mes;
    std::shared_ptr<char[]> RawData;

    uint32_t PayloadSize;
    std::shared_ptr<TMessageHolder<TMessage>[]> Payload;

    template<typename U>
    requires std::derived_from<U, T>
    TMessageHolder<U> Cast() {
        return TMessageHolder<U>(static_cast<U*>(Mes), RawData, PayloadSize, Payload);
    }

    template<typename U>
    requires std::derived_from<U, T>
    auto Maybe() { ... }
};

В коде TRaft, методы Cast и Maybe используются для определения типа сообщения и соответствующей обработки, что упрощает обработку входящих сообщений и улучшает читаемость кода:

void TRaft::Candidate(TTime now, TMessageHolder<TMessage> message) {
    if (auto maybeResponseVote = message.Maybe<TRequestVoteResponse>()) {
        OnRequestVote(std::move(maybeResponseVote.Cast()));
    } else if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
        OnRequestVote(now, std::move(maybeRequestVote.Cast()));
    } else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
        OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
    }
}

void TRaft::Follower(TTime now, TMessageHolder<TMessage> message) {
    if (auto maybeRequestVote = message.Maybe<TRequestVoteRequest>()) {
        OnRequestVote(now, std::move(maybeRequestVote.Cast()));
    } else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
        OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
    }
}

void TRaft::Leader(TTime now, TMessageHolder<TMessage> message, const std::shared_ptr<INode>& replyTo) {
    if (auto maybeAppendEntries = message.Maybe<TAppendEntriesResponse>()) {
        OnAppendEntries(std::move(maybeAppendEntries.Cast()));
    } else if (auto maybeCommandRequest = message.Maybe<TCommandRequest>()) {
        OnCommandRequest(std::move(maybeCommandRequest.Cast()), replyTo);
    } else if (auto maybeVoteRequest = message.Maybe<TRequestVoteRequest>()) {
        OnRequestVote(now, std::move(maybeVoteRequest.Cast()));
    } else if (auto maybeAppendEntries = message.Maybe<TAppendEntriesRequest>()) {
        OnAppendEntries(now, std::move(maybeAppendEntries.Cast()));
    }
}

В сетевой библиотеке чтение и запись сообщений реализованы простым и эффективным образом. Для записи используется метод Write класса TMessageWriter, который сначала записывает основное сообщение с помощью TByteWriter, а затем рекурсивно обрабатывает и записывает Payload:

template<typename TSocket>
TValueTask<void> TMessageWriter<TSocket>::Write(TMessageHolder<TMessage> message) {
    co_await TByteWriter(Socket).Write(message.Mes, message->Len);

    auto payload = std::move(message.Payload);
    for (uint32_t i = 0; i < message.PayloadSize; ++i) {
        co_await Write(std::move(payload[i]));
    }

    co_return;
}

Для чтения используется метод Read класса TMessageReader, который сначала читает тип и длину сообщения, а затем само сообщение и его Payload:

template<typename TSocket> 
TValueTask<TMessageHolder<TMessage>> TMessageReader<TSocket>::Read() {
    decltype(TMessage::Type) type;
    decltype(TMessage::Len) len;
    auto s = co_await Socket.ReadSome(&type, sizeof(type));
    if (s != sizeof(type)) {
        throw std::runtime_error("Connection closed");
    }
    s = co_await Socket.ReadSome(&len, sizeof(len));
    if (s != sizeof(len)) {
        throw std::runtime_error("Connection closed");
    }
    auto mes = NewHoldedMessage<TMessage>(type, len);
    co_await TByteReader(Socket).Read(mes->Value, len - sizeof(TMessage));
    auto maybeAppendEntries = mes.Maybe<TAppendEntriesRequest>();
    if (maybeAppendEntries) {
        auto appendEntries = maybeAppendEntries.Cast();
        auto nentries = appendEntries->Nentries;
        mes.InitPayload(nentries);
        for (uint32_t i = 0; i < nentries; i++) {
            mes.Payload[i] = co_await Read();
        }
    }
    co_return mes;
}

Класс TRaftServer отвечает за управление сетевыми соединениями и обработку сообщений для TRaft. Для каждого входящего соединения запускается корутина, которая читает сообщения и передает их в TRaft для обработки. Эта корутина также регулярно вызывает ProcessTimeout и DrainNodes:

    while (true) {
        auto mes = co_await TMessageReader(client->Sock()).Read();
        Raft->Process(std::chrono::steady_clock::now(), std::move(mes), client);
        Raft->ProcessTimeout(std::chrono::steady_clock::now());
        DrainNodes();
    }

DrainNodes запускает корутины отправки сообщений для каждой ноды.

Помимо этого, запускается отдельная корутина для обслуживания таймаутов, которая периодически обновляет состояние Raft, выполняет DrainNodes и может выводить отладочную информацию:

    while (true) {
        Raft->ProcessTimeout(std::chrono::steady_clock::now());
        DrainNodes();
        auto t1 = std::chrono::steady_clock::now();
        if (t1 > t0 + dt) {
            DebugPrint();
            t0 = t1;
        }
        co_await Poller.Sleep(t1 + sleep);
    }

Эти корутины обеспечивают непрерывную и эффективную обработку сетевых сообщений и таймаутов в системе.

TRaftServer является шаблонным классом, параметризованным типом сокета TSocket. Возможные варианты сокетов включают TSocket для обычных сетевых операций, TUringSocket для операций через интерфейс io_uring, и TSslSocket — обертку для SSL-соединений. Эта гибкая структура позволяет TRaftServer эффективно работать с различными типами сетевых соединений, обеспечивая широкий спектр возможностей для управления сетевыми взаимодействиями в рамках алгоритма Raft.

Тестирование

Класс TRaft разработан так, чтобы быть независимым от сети, что делает его особенно удобным для юнит-тестирования. Это позволяет разработчикам тестировать различные сценарии работы алгоритма Raft, не полагаясь на внешние сетевые компоненты. В качестве примера можно рассмотреть тестирование сценария 7f raft.pdf:

void test_follower_append_entries_7f(void**) {
    std::vector<TMessageHolder<TMessage>> messages;
    auto onSend = [&](const TMessageHolder<TMessage>& message) {
        messages.push_back(message);
    };
    auto ts = std::make_shared<TFakeTimeSource>();
    auto raft = MakeRaft(onSend, 3);
    raft->SetState(TState{
        .CurrentTerm = 1,
        .VotedFor = 2,
        .Log = MakeLog<TLogEntry>({1,1,1,2,2,2,3,3,3,3,3})
    });
    auto mes = NewHoldedMessage(TMessageEx {
        .Src = 2,
        .Dst = 1,
        .Term = 8,
    }, TAppendEntriesRequest {
        .PrevLogIndex = 3,
        .PrevLogTerm = 1,
        .LeaderCommit = 9,
        .LeaderId = 2,
        .Nentries = 7,
    });
    SetPayload(mes, MakeLog({4,4,5,5,6,6,6}));
    raft->Process(ts->Now(), mes);
    auto last = messages.back().Cast<TAppendEntriesResponse>();
    assert_true(last->Success);
    assert_true(last->MatchIndex = 10);
    assert_true(raft->GetState()->Log.size() == 10);
    assert_terms(raft->GetState()->Log, {1,1,1,4,4,5,5,6,6,6});
}

В этом примере юнит-теста реализована ситуация, когда узел в состоянии Follower получает сообщения TAppendEntriesRequest. Тест имитирует приходящие сообщения и проверяет логику обработки и изменения состояния узла в соответствии с правилами Raft. Важным аспектом здесь является использование TFakeTimeSource - контролируемого источника времени, который заменяет системные часы. Это позволяет точно симулировать таймауты и другие временные зависимости.

Тестирование задержки (latency) записи сообщений одним клиентом на кластер из трех и пяти узлов проведено для оценки производительности. Результаты тестирования для кластера из трех узлов показывают следующие перцентили задержки в наносекундах:

  • 50-й перцентиль (медиана): 292,872 нс
  • 80-й перцентиль: 407,561 нс
  • 90-й перцентиль: 569,164 нс
  • 99-й перцентиль: 40,279,001 нс

Эти результаты указывают на то, что большинство операций записи происходит с относительно низкой задержкой.

Для кластера из пяти узлов результаты тестирования задержки записи сообщений показывают следующие значения:

  • 50-й перцентиль (медиана): 425,194 нс
  • 80-й перцентиль: 672,541 нс
  • 90-й перцентиль: 1,027,669 нс
  • 99-й перцентиль: 38,578,749 нс

Эти данные свидетельствуют о том, что с увеличением количества узлов задержки записи также увеличиваются.

В оценке задержек, полученных в тестах моей Raft библиотеки, нет однозначного ответа на вопрос, хорошие они или плохие. Всё зависит от того, что важнее для вашей системы: быстродействие или надёжность. Для критически важных систем, где каждая микросекунда на счету, эти задержки могут показаться высокими. Но если в приоритете надёжность и согласованность данных, то миллисекундные задержки — это нормально. В конце концов, в распределённых системах, где требуется согласование данных между множеством узлов, такие показатели задержек часто являются приемлемым компромиссом.

Заключение

Подводя итог, хочется отметить, что созданная мной библиотека является ярким примером гибкого использования стандартных корутин C++20 для построения распределённых систем. Она несёт в себе основной функционал алгоритма Raft, управляя состояниями узлов и обрабатывая сообщения, но при этом оставляет простор для дальнейших улучшений и добавления продвинутых функций, таких как персистирование состояния на диск, снапшотинг и динамическое управление узлами.

Ссылки

Вот ссылки для дополнительной информации и материалов, упомянутых в статье:

  1. Предыдущая часть статьи: Разработка сетевой библиотеки на C++20: интеграция асинхронности и алгоритма Raft (часть 1))
  2. Основная статья об алгоритме Raft: Raft Consensus Algorithm
  3. Сетевая библиотека на GitHub: coroio
  4. Реализация Raft на GitHub: miniraft-cpp
★★★★★

Проверено: hobbit ()

Ответ на: комментарий от dataman

Уверен, что этих реализаций гораздо больше чем там перечислено.

Reset ★★★★★
() автор топика

Это круто конечно. Но такие вещи лучше на хабре писать. Тут их потом фиг найдешь.

О! Или лучше размести в md файлах на github в составе репозитория. Это ничего не стоит и будет сразу мини документация вместе с исходниками.

rumgot ★★★★★
()

То чувство, когда выхлоп ЛОР годнее выхлопа хабра! Автор, так держать.

PS а я заново переоткрыл для себя раздел Статьи.

hatred ★★★
()
Ответ на: комментарий от Werenter

А что статьи в какой то отдельной поисковой выдаче будут, а не вместе с остальными темами?

@maxcom

Кстати было бы очень удобно, если был отельный раздел «Статьи и руководства», например для таких тем, как текущая. И чтобы при поиске можно было искать только там.

rumgot ★★★★★
()
Последнее исправление: rumgot (всего исправлений: 2)
Ответ на: комментарий от Werenter

А. Понял. Сначала выполняем запрос. Потом, если среди результатов есть статьи, можно уже выбрать только их и требуемый раздел. Блин, это, мягко говоря, не интуитивно.

rumgot ★★★★★
()
Ответ на: комментарий от rumgot

@maxcom, я думаю что улучшение UI лишним не будет (тем более в движке нужный функционал уже есть).

Werenter ★★
()
Ответ на: комментарий от rumgot

Не а, не так. Нужный диалог (с выбором раздела) доступен, если поиск открывать не кнопкой «поиск», а из профиля любого пользователя кнопкой «комментарии».

Werenter ★★
()
Ответ на: комментарий от Werenter

Мой способ тоже работает. Но было бы хорошо, если бы в первоначальном диалоге поиска сразу можно было выбрать СТАТЬИ, т.е. до выполнения первого запроса.

rumgot ★★★★★
()
Последнее исправление: rumgot (всего исправлений: 2)

Спасибо за статью. Очень познавательно.

Не лез в исходники, дилетантский вопрос: я ж правильно понимаю, для обмена сообщениями используется UDP, не TCP?

blex ★★
()
Ответ на: комментарий от blex

для обмена сообщениями используется UDP, не TCP?

Используется либо голый TCP либо TLS поверх TCP. Для UDP надо будет разбираться много с чем.

На уровне моей сетевой библиотеки UDP поддерживается и данный функционал используется только в DNS-резолвере (да, этот функционал я добавил на этой неделе).

Reset ★★★★★
() автор топика
Ответ на: комментарий от Reset

У TCP ж таймауты… Как же это отрабатывает, если в сети начались реальные проблемы и ее плющит и колбасит. Придут через время старые сообщения, на которые уже никто не расчитывал. Когда в сети все хорошо, то да, микро и миллисекунды. Но этот же код как раз для того момента, когда поплохело…

Вопрос не к автору лично, и ни в коем случае не претензия. Возможно, я чего-то не понимаю.

blex ★★
()
Ответ на: комментарий от blex

Придут через время старые сообщения, на которые уже никто не расчитывал.

Это проблема ядра, а не пользовательского кода. Ядро заигнорит их на основании seqno.

Когда в сети все хорошо, то да, микро и миллисекунды.

Проблема в том, что протоколы консенсуса работают не быстро даже когда все хорошо, так как надо дождаться ack’ов с узлов. Когда сеть тормозит тогда все вообще плохо, я такой случай не рассматриваю и не тестировал его.

Reset ★★★★★
() автор топика
Ответ на: комментарий от Reset

Я неправильно понял область применения. Я решил, что это, например, для выбора активного сервера БД среди 3 работающих параллельно.

blex ★★
()
Ответ на: комментарий от blex

Такая область применения может быть, но основная область – реплицируемый лог, который может быть использован для реализации чего-угодно, например распределенного kv-хранилища.

Reset ★★★★★
() автор топика
1 февраля 2024 г.
Для того чтобы оставить комментарий войдите или зарегистрируйтесь.