LINUX.ORG.RU

Парсинг потока на пакеты сообщений - минимизировать копирование

 , , ,


0

5

Есть TCP соединение (т.е. поток байтов), из него эти байты читаются и разбиваются на сообщения-пакеты определённого протокола уровня приложения. Каждое сообщение имеет либо фиксированный заранее известный размер, либо в заголовке содержит длину. Максимальный размер пакета - N, заранее известен и ограничен.

Варианты реализации: 1) Пишем байты из сокета в буфер размером N, когда в буфере оказывается целое сообщение, вызываем функцию-обработчик этого сообщения. Недостатки - нужно целое сообщение в буфере. Допустим, в буфер пришло сообщение размером N/2, за ним сообщение размером N, но оно уже в буфер целиком не влазит. Поэтому, после обработки первого сообщения затираем его и перемещаем первый кусок следующего сообщения к началу буфера, используя memmove(). Затем читаем из сокета до победного конца. Мне здесь не нравится наличие memmove()

 
|----message1---||---mesage2-- 
0                            N

2) Выделяем буфер размером 2N. Читаем из сокета не больше N байт за раз. Если буфер заполняется на N или больше, читаем из сокета ровно столько, чтобы прочитать хвост последнего сообщения, размер хвоста известен из уже пришедшего заголовка. После этого пишем в буфер с начала. Здесь мне не нравится то, что recv() возможно придётся дёргать чаще мелкими порциями. Но вариант кажется оптимальным.

|----message1---||---mesage2----------|---------------
0                             N                      2N

3) Допускаем, что функции-обработчику не нужно целое сообщение в буфере, функция может хранить состояние и парсить сообщение по кусочкам, по мере прихода. Здесь недостаток в том, что поля сообщения придётся по байтам или по кусочкам копировать в отдельные буфера. Потому что в буфере чтения оно может не быть представлено в целом виде. В первых двух вариантах можно просто передавать указатели на поля внутри буфера чтения.

Что анонимный разум имеет сказать по этому поводу? Есть идеи|алгоритмы получше?

★★★★★

Кормим парсер побайтово, как это делают все нормальные люди.

Если протокол поддерживает разбиение на сообщения без разбора явно или не явно, то перед парсером можно поставить сплиттер.

Т.е. интерфейс будет выглядеть примерно так:


struct Parser {
  list<message> feed(vector<char> received);
}

Это DOM-like вариант, для случаев когда сообщения нужны целиком.

SAX-like вариант, он же потоковый может выглядеть примерно так:

struct Handler {
   void on_message(MessageHeader*);
   void on_field(Field*);
   // e.t.c
   void on_message_end(MessageFooter*);
}

struct Parser {
    void feed(vector<char> data, Handler& handler);
}

Фишка в том, что sax вариант в нормальном парсере у тебя одна фигня будет в реализации так или иначе, вопрос только какие требования к интерфейсу.

pon4ik ★★★★★
()
Последнее исправление: pon4ik (всего исправлений: 1)

Используй первый метод, mmap’ни свой буфер 2 раза рядом друг с другом, и ты получишь бесплатный «wraparound», никаких memmove.

https://en.wikipedia.org/w/index.php?title=Circular_buffer&oldid=588930355#Optimization

https://fgiesen.wordpress.com/2012/07/21/the-magic-ring-buffer/

Подправь код под memfd_create или что у тебя там.

anonymous
()
Ответ на: комментарий от pon4ik

Немного дополню. Основная идея этого подхода, в том, что для разбора каждого сообщения у тебя всё равно будет заводится контекст хранящий состояния разбора на данный момент, вопрос только где ты его хранишь - внутри парсера и копируешь в итоге в виде сообщения(по сути поддерево синтаксического дерева разбора всего потока) с каким то интерфейсом доступа к этому состоянию, либо передаёшь меньшие копии контекста в виде более мелких листьев дерева разбора клиенту API только в тех обработчиках на которые он подписан, а он уже ответсвенен за реакцию на эти события.

В первом варианте - проще писать итоговую бизнес логику, но сложнее оптимизировать работу с памятью, т.к. придётся обрабатывать аллокации. Во втором варианте сложнее писать логику, но аллокаций можно избежать вовсе, или сильно снизить их количество (зависит от протокола).

pon4ik ★★★★★
()
Ответ на: комментарий от anonymous

но идея конечно интересная

Harald ★★★★★
() автор топика

Не сразу понял суть проблемы, по факту ты спрашиваешь, делать ли тебе сплиттер и если и делать, то как, либо кормить поток парсеру и копировать данные в контекст, а ты хотел бы обойтись вообще без копий.

Т.е. я по факту предложил третий вариант.

pon4ik ★★★★★
()
Последнее исправление: pon4ik (всего исправлений: 1)

это называется стриминговый протокол, у всех есть в начале размер пейлоада который идет дальше[br]

anonymous
()
Ответ на: комментарий от pon4ik

То что предлагает пончик, это слишком сложно.

у тебя всё равно будет заводится контекст хранящий состояния разбора на данный момент

Нет, есть случаи, когда не обязательно хранить контекст состояния разбора. ИМХО, то что описывает ТС как раз тот случай.

Я предложу свой вариант. У него есть свои плюсы, и свои минусы. Но с точки зрения простоты и удобства ИМХО, самый лучший. Правда нужно иметь реализацию некоторой структуры данных - кольцевого буфера. Уверен ТС сам сможет осилить написание реализации.

У нас есть кольцевой буфер фиксированного размера, такого, чтобы в него гарантировано влезло хотя бы одно сообщение максимального размера.

По мере прихода данных по сети мы записываем байты в этот кольцевой буфер «в хвост». Потом пытаемся достать полное сообщение из «из головы» буфера. Т.е. сначала проверяем, что байт в буфере набралось на заголовок. Если набралось, то достаем заголовок без извлечения (т.е. операция Peek, а не Pop) и смотрим на поле размера. По полю размера определяем полный размер сообщения. Теперь смотрим, есть ли в кольцевом буфере все сообщение целиком. Если есть, то достаем и обрабатываем. Если не получилось достать полное сообщение в этот раз, то когда придут новые данные, попытаемся достать полное сообщение снова.

pathfinder ★★★★
()
Последнее исправление: pathfinder (всего исправлений: 1)
Ответ на: комментарий от pathfinder

Теперь смотрим, есть ли в кольцевом буфере все сообщение целиком. Если есть, то достаем и обрабатываем.

Оно может быть целиком, но сдвинуто от начала буфера, т.е сначала хвост сообщения, потом начало сообщения

Harald ★★★★★
() автор топика
Ответ на: комментарий от Harald

То, как хранятся данные в кольцевом буфере, пускай интересно только реализации этого самого буфера. У тебя есть функции Pop(), Peek(),Push(),GetSize(). Больше ничего интересовать не должно.

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

У нас есть кольцевой буфер фиксированного размера, такого, чтобы в него гарантировано влезло хотя бы одно сообщение максимального размера.

У меня в TODO тоже есть подобная проблема с uIP. Памяти в обрез, счёт на байты. Но DMA контроллер на DSP ничего не знает про кольцевой буфер.

gag ★★★★★
()
Ответ на: комментарий от pathfinder

Только это значит, что данные напрямую из кольцевого буфера неюзабельны, нужно пакет вычитывать целиком из кольцевого буфера в линейный, чтобы получить непрерывный кусок памяти

Harald ★★★★★
() автор топика
Ответ на: комментарий от pathfinder

Pop()

Т.е. нужен ещё один целый буфер с копированием?

gag ★★★★★
()
Ответ на: комментарий от Harald

Это недостаток схем с промежуточной буферизацией. Да, в кольцевом буфере данные будут либо в одном непрерывном куске памяти, либо в двух непрерывных кусках памяти.

Я не понимаю, вам с этим данными ещё работать надо, т.е. приводить к удобному виду, что само собой подразумевает копирование.

pathfinder ★★★★
()
Ответ на: комментарий от pathfinder

Я не понимаю, вам с этим данными ещё работать надо, т.е. приводить к удобному виду, что само собой подразумевает копирование.

ну в первых двух вариантах можно передавать указатели на содержимое внутри буфера, во время выполнения функции-обработчика они будут валидными

Harald ★★★★★
() автор топика
Ответ на: комментарий от Harald

Я так понимаю, тебя не пугает перспектива обращаться по указателю на не выровненные данные, а экономия копирования стоит на первом месте. Тогда делай по второму варианту.

pathfinder ★★★★
()

2 вариант кажется мне наиболее простым

В случае большого потока сообщений его видимо можно и заоптимизировать выделив буфер размером x*N и читать из сокета не больше (x-1)*N байт за раз

alx777 ★★
()
Ответ на: комментарий от Harald

Как раз сущность которая разбивает поток на сообщения и кормит буфером в котором ровно одно сообщение парсер. Его не всегда возможно реализовать, но практически всегда. Вот реализовать его эффективно, без реалоков или избыточных обращений к i/o - сложно.

pon4ik ★★★★★
()
Ответ на: комментарий от pathfinder

По-сути второй вариант (у ТС) это как раз и есть кольцевой буфер векторов построенный на векторе(ну типа как куча на векторе, только кольцевой буфер). Недостаток по большому счёту только один - лишняя точка позволить планировщику себя усыпить. Но, судя по тому, что целевая платформа контроллер, там скорее всего какой нить rtos, и вероятность заснуть там где ты не собирался отсутсвует.

pon4ik ★★★★★
()
Ответ на: комментарий от pathfinder

Почитал дальше чуть - ты предлагаешь всё равно производить разбор сообщения и копировать его. Но тогда - любой буфер кроме того, который содержит текущие данные с сокета есть лишняя сущность, т.к. как раз проще сразу укладывать данные либо в представление новое либо уведомлять о событиях в потоке его подписчиков.

pon4ik ★★★★★
()

или по кусочкам копировать в отдельные буфера.

Есть место для нескольких? Тогда фантазии

Сделать N буферов . Например 4

  • Сообщение A пришло полностью в 1N -> отдаём обработчику -> очищаем буфер получаем следующее сообщение
  • Сообщение B пришло частично в 1N -> ну пришло и пришло в обработчик передавать пока нечего, устанавливаем в качестве текущего буфер 2N
  • Сообщение C пришло тоже частично в уже 2N ну опять мимо устанавливаем в качестве текущего буфер 3N
  • Сообщение B пришло теперь оно готово передаём обработчику типа 1N=240 3N=240 где 240 размеры, a 1N 2N последовательность откуда читать очищяем буферы
  • Сообщение D пришло полностью текущий N3 уже пустой - отдаём - очищаем
  • Сообщение X пришло в 3N частично переключаем на 2N
  • Сообщение Y пришло частично - да что же такое то! переключаем на 4N
  • Сообщение Z пришло опять частично … timestep D > X || Y || Z Сорян D но ты в пролёте стираем его и Текущий снова N3

Тоесть если идут ошмётки сообщения то мы их всё же храним в некотором колличестве, но если остатки куда то алё, а буферы для кусков кончились шлём нахрен самый старый.

P.S С этими xN ABCDXYZ я сам запустался, но смысл понятен наверное. Копировать ничего не надо но в обработчик надо отдавать список последовательности буферов с размерами что бы он их читал и получал по итогу целое сообщение всегда. Количество буферов подобрать как компромисс того сколько есть памяти и насколько часто приходят вместо целых сообщений ошмётки которые надо собирать

Я хз как это реализовывать, но мне прикольно показалось :D

LINUX-ORG-RU ★★★★★
()
Ответ на: комментарий от pon4ik

вообще целевая платформа с одной стороны контроллер, с другой линукс, желательно иметь одну реализацию на обе, чтоб меньше кода педалить

Harald ★★★★★
() автор топика
Ответ на: комментарий от LINUX-ORG-RU

сообщения идут последовательно, пока одно не придёт целиком, слеующее не может появиться. Ну или другая сторона шлёт неправильные сообщения и мы дропаем соединение

Harald ★★★★★
() автор топика

Возможно, вы идёте немного не с той стороны и/или смотрите на процесс не с той высоты.

Просто избавление от копирования в каком-то конкретном месте может породить существенные накладные расходы в другом. Один из антипаттернов - кормить парсер отдельными байтами, а не использовать SIMD (примеры №1, №2).

Однократное копирование может быть очень оправдано, особенно для сбора сообщений («выпрямления» данных), для простой и эффективной их последующей обработки.

В сценариях без user-mode TCP-стека (когда данные копируются из ядра), достаточно неплохо работает вариант кольцевого буфера:

  • Данные принимаются непосредственно в голову буфера, т.е. каждый read() их просто дописывает.
  • Перевод указателя записи из конца буфера в начало происходит на границе сообщения, когда до конца буфера остается места меньше максимального размера сообщения. Таким образом, все сообщения всегда лежат в памяти линейными кусками.
  • Данные передаются на обработку целым куском с хвоста буфера, без копирования. Хвост сдвигается после завершения обработки.
  • Если обработка асинхронная, то для каждого обрабатывающего треда (или отправленного на обработку сообщения), отслеживается свое значение «хвоста», а свободное место в буфере отслеживается по минимальному значению из всех «хвостов» находящихся в обработке. При необходимости можно организовать цепочку кольцевых буферов.

Эта схема даёт 0 лишних копирований, при условии что данные всё равно приходиться вычитывать/копировать из ядра.

Если хочется еще быстрее, то см Seastar.

Deleted
()
Последнее исправление: Deleted (всего исправлений: 1)

читать и обрабатывать блоками размером до MTU. Вместо массива байт будет массив(или список, от реализации) частично заполненных блоков. Операции копирования резко сокращаются, почти исчезают, но появляются трабл что важный кусок может быть «разрезан» между двумя блоками. Чтобы такого избегать неплохо знать протокол.

Посмотри как кернигановский streams сделан в плане работы с памятью. Может где-то в архивах ещё остались оргинальные публикации.

MKuznetsov ★★★★★
()

Каждое сообщение имеет либо фиксированный заранее известный размер,

Тогда проблемы же нет. Просто размер буфера должен быть кратен размеру сообщения.

либо в заголовке содержит длину.

Можно сначала читать заголовок, смотреть есть ли место и действовать по обстоятельствам (если нет, то запомнить заголовок на будущее).

Чувствую себя капитаном, но, вроде, это не озвучили, либо все поняли почему это не подходит.

xaizek ★★★★★
()
Ответ на: комментарий от Deleted

В сценариях без user-mode TCP-стека (когда данные копируются из ядра), достаточно неплохо работает вариант кольцевого буфера:

Данные принимаются непосредственно в голову буфера, т.е. каждый read() их просто дописывает. […]

Эта схема даёт 0 лишних копирований, при условии что данные всё равно приходиться вычитывать/копировать из ядра.

Внезапно наткнулся на (возможно первое) публичное описание обозначенного подхода, с примерами для recv(): https://web.archive.org/web/20121006080117/http://www.codeproject.com/Articles/3479/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist

Deleted
()
Вы не можете добавлять комментарии в эту тему. Тема перемещена в архив.